This is an automated email from the ASF dual-hosted git repository.

wuchunfu pushed a commit to branch dev
in repository https://gitbox.apache.org/repos/asf/seatunnel.git


The following commit(s) were added to refs/heads/dev by this push:
     new 853e973212 [Improve][Connector-V2] Close all ResultSet after used 
(#7389)
853e973212 is described below

commit 853e973212956751250d8b1212f413cfcf3ca0fd
Author: Jia Fan <[email protected]>
AuthorDate: Fri Aug 16 15:41:59 2024 +0800

    [Improve][Connector-V2] Close all ResultSet after used (#7389)
    
    * [Improve][Connector-V2] Close all ResultSet after used
    
    * update
---
 .../cdc/base/dialect/JdbcDataSourceDialect.java    | 93 +++++++++++-----------
 .../sink/client/ClickhouseSinkWriter.java          |  5 +-
 .../connectors/doris/catalog/DorisCatalog.java     | 80 ++++++++++---------
 .../jdbc/catalog/AbstractJdbcCatalog.java          |  9 ++-
 .../catalog/oceanbase/OceanBaseMySqlCatalog.java   | 10 ++-
 .../seatunnel/jdbc/catalog/utils/CatalogUtils.java | 88 ++++++++++----------
 .../jdbc/catalog/utils/JdbcColumnConverter.java    | 34 +++++---
 .../jdbc/internal/dialect/hive/HiveDialect.java    |  7 +-
 .../jdbc/internal/dialect/iris/IrisDialect.java    |  6 +-
 .../dialect/tablestore/TablestoreDialect.java      |  6 +-
 .../jdbc/internal/dialect/xugu/XuguDialect.java    |  6 +-
 .../starrocks/catalog/StarRocksCatalog.java        | 39 +++++----
 .../tdengine/sink/TDengineSinkWriter.java          |  9 ++-
 .../src/test/java/mongodb/MongodbCDCIT.java        |  5 +-
 .../connectors/seatunnel/cdc/mysql/MysqlCDCIT.java |  6 +-
 .../connector/cdc/sqlserver/SqlServerCDCIT.java    |  5 +-
 .../seatunnel/clickhouse/ClickhouseIT.java         | 59 +++++++-------
 .../clickhouse/ClickhouseSinkCDCChangelogIT.java   |  8 +-
 .../e2e/connector/doris/AbstractDorisIT.java       |  5 +-
 .../e2e/connector/doris/DorisCDCSinkIT.java        |  8 +-
 .../seatunnel/e2e/connector/doris/DorisIT.java     | 15 ++--
 .../seatunnel/jdbc/JdbcPostgresIdentifierIT.java   |  5 +-
 .../seatunnel/jdbc/JdbcOceanBaseITBase.java        |  9 +--
 .../connectors/seatunnel/jdbc/JdbcPostgresIT.java  |  4 +-
 .../seatunnel/jdbc/JdbcSinkCDCChangelogIT.java     |  4 +-
 .../seatunnel/jdbc/JdbcMySqlCreateTableIT.java     | 15 ++--
 .../seatunnel/jdbc/JdbcSqlServerCreateTableIT.java | 15 ++--
 .../connectors/seatunnel/jdbc/JdbcDorisIT.java     |  6 +-
 .../connectors/seatunnel/jdbc/JdbcDorisdbIT.java   |  6 +-
 .../connectors/seatunnel/jdbc/JdbcIrisIT.java      |  8 +-
 .../e2e/connector/kafka/KafkaFormatIT.java         |  7 +-
 .../e2e/connector/pulsar/CanalToPulsarIT.java      |  5 +-
 .../connector/starrocks/StarRocksCDCSinkIT.java    |  4 +-
 .../e2e/connector/starrocks/StarRocksIT.java       |  6 +-
 .../e2e/connector/tdengine/TDengineIT.java         |  4 +-
 35 files changed, 326 insertions(+), 275 deletions(-)

diff --git 
a/seatunnel-connectors-v2/connector-cdc/connector-cdc-base/src/main/java/org/apache/seatunnel/connectors/cdc/base/dialect/JdbcDataSourceDialect.java
 
b/seatunnel-connectors-v2/connector-cdc/connector-cdc-base/src/main/java/org/apache/seatunnel/connectors/cdc/base/dialect/JdbcDataSourceDialect.java
index 80c9c81cca..05e9a89c04 100644
--- 
a/seatunnel-connectors-v2/connector-cdc/connector-cdc-base/src/main/java/org/apache/seatunnel/connectors/cdc/base/dialect/JdbcDataSourceDialect.java
+++ 
b/seatunnel-connectors-v2/connector-cdc/connector-cdc-base/src/main/java/org/apache/seatunnel/connectors/cdc/base/dialect/JdbcDataSourceDialect.java
@@ -77,22 +77,24 @@ public interface JdbcDataSourceDialect extends 
DataSourceDialect<JdbcSourceConfi
 
         DatabaseMetaData metaData = jdbcConnection.connection().getMetaData();
 
+        // seq -> column name
+        List<Pair<Integer, String>> primaryKeyColumns = new ArrayList<>();
+        String pkName = null;
+
         // According to the Javadoc of 
java.sql.DatabaseMetaData#getPrimaryKeys,
         // the returned primary key columns are ordered by COLUMN_NAME, not by 
KEY_SEQ.
         // We need to sort them based on the KEY_SEQ value.
-        ResultSet rs =
-                metaData.getPrimaryKeys(tableId.catalog(), tableId.schema(), 
tableId.table());
 
-        // seq -> column name
-        List<Pair<Integer, String>> primaryKeyColumns = new ArrayList<>();
-        String pkName = null;
-        while (rs.next()) {
-            // all the PK_NAME should be the same
-            pkName = rs.getString("PK_NAME");
-            String columnName = rs.getString("COLUMN_NAME");
-            int keySeq = rs.getInt("KEY_SEQ");
-            // KEY_SEQ is 1-based index
-            primaryKeyColumns.add(Pair.of(keySeq, columnName));
+        try (ResultSet rs =
+                metaData.getPrimaryKeys(tableId.catalog(), tableId.schema(), 
tableId.table())) {
+            while (rs.next()) {
+                // all the PK_NAME should be the same
+                pkName = rs.getString("PK_NAME");
+                String columnName = rs.getString("COLUMN_NAME");
+                int keySeq = rs.getInt("KEY_SEQ");
+                // KEY_SEQ is 1-based index
+                primaryKeyColumns.add(Pair.of(keySeq, columnName));
+            }
         }
         // initialize size
         List<String> pkFields =
@@ -121,41 +123,42 @@ public interface JdbcDataSourceDialect extends 
DataSourceDialect<JdbcSourceConfi
             throws SQLException {
         DatabaseMetaData metaData = jdbcConnection.connection().getMetaData();
 
-        ResultSet resultSet =
+        try (ResultSet resultSet =
                 metaData.getIndexInfo(
-                        tableId.catalog(), tableId.schema(), tableId.table(), 
false, false);
-        // index name -> index
-        Map<String, ConstraintKey> constraintKeyMap = new HashMap<>();
-        while (resultSet.next()) {
-            String columnName = resultSet.getString("COLUMN_NAME");
-            if (columnName == null) {
-                continue;
+                        tableId.catalog(), tableId.schema(), tableId.table(), 
false, false)) {
+            // index name -> index
+            Map<String, ConstraintKey> constraintKeyMap = new HashMap<>();
+            while (resultSet.next()) {
+                String columnName = resultSet.getString("COLUMN_NAME");
+                if (columnName == null) {
+                    continue;
+                }
+
+                String indexName = resultSet.getString("INDEX_NAME");
+                boolean noUnique = resultSet.getBoolean("NON_UNIQUE");
+
+                ConstraintKey constraintKey =
+                        constraintKeyMap.computeIfAbsent(
+                                indexName,
+                                s -> {
+                                    ConstraintKey.ConstraintType 
constraintType =
+                                            
ConstraintKey.ConstraintType.INDEX_KEY;
+                                    if (!noUnique) {
+                                        constraintType = 
ConstraintKey.ConstraintType.UNIQUE_KEY;
+                                    }
+                                    return ConstraintKey.of(
+                                            constraintType, indexName, new 
ArrayList<>());
+                                });
+
+                ConstraintKey.ColumnSortType sortType =
+                        "A".equals(resultSet.getString("ASC_OR_DESC"))
+                                ? ConstraintKey.ColumnSortType.ASC
+                                : ConstraintKey.ColumnSortType.DESC;
+                ConstraintKey.ConstraintKeyColumn constraintKeyColumn =
+                        new ConstraintKey.ConstraintKeyColumn(columnName, 
sortType);
+                constraintKey.getColumnNames().add(constraintKeyColumn);
             }
-
-            String indexName = resultSet.getString("INDEX_NAME");
-            boolean noUnique = resultSet.getBoolean("NON_UNIQUE");
-
-            ConstraintKey constraintKey =
-                    constraintKeyMap.computeIfAbsent(
-                            indexName,
-                            s -> {
-                                ConstraintKey.ConstraintType constraintType =
-                                        ConstraintKey.ConstraintType.INDEX_KEY;
-                                if (!noUnique) {
-                                    constraintType = 
ConstraintKey.ConstraintType.UNIQUE_KEY;
-                                }
-                                return ConstraintKey.of(
-                                        constraintType, indexName, new 
ArrayList<>());
-                            });
-
-            ConstraintKey.ColumnSortType sortType =
-                    "A".equals(resultSet.getString("ASC_OR_DESC"))
-                            ? ConstraintKey.ColumnSortType.ASC
-                            : ConstraintKey.ColumnSortType.DESC;
-            ConstraintKey.ConstraintKeyColumn constraintKeyColumn =
-                    new ConstraintKey.ConstraintKeyColumn(columnName, 
sortType);
-            constraintKey.getColumnNames().add(constraintKeyColumn);
+            return new ArrayList<>(constraintKeyMap.values());
         }
-        return new ArrayList<>(constraintKeyMap.values());
     }
 }
diff --git 
a/seatunnel-connectors-v2/connector-clickhouse/src/main/java/org/apache/seatunnel/connectors/seatunnel/clickhouse/sink/client/ClickhouseSinkWriter.java
 
b/seatunnel-connectors-v2/connector-clickhouse/src/main/java/org/apache/seatunnel/connectors/seatunnel/clickhouse/sink/client/ClickhouseSinkWriter.java
index 74119d5b46..b5f1505d11 100644
--- 
a/seatunnel-connectors-v2/connector-clickhouse/src/main/java/org/apache/seatunnel/connectors/seatunnel/clickhouse/sink/client/ClickhouseSinkWriter.java
+++ 
b/seatunnel-connectors-v2/connector-clickhouse/src/main/java/org/apache/seatunnel/connectors/seatunnel/clickhouse/sink/client/ClickhouseSinkWriter.java
@@ -210,8 +210,9 @@ public class ClickhouseSinkWriter
             return false;
         }
         String configKey = "allow_experimental_lightweight_delete";
-        try (Statement stmt = clickhouseConnection.createStatement()) {
-            ResultSet resultSet = stmt.executeQuery("SHOW SETTINGS ILIKE '%" + 
configKey + "%'");
+        try (Statement stmt = clickhouseConnection.createStatement();
+                ResultSet resultSet =
+                        stmt.executeQuery("SHOW SETTINGS ILIKE '%" + configKey 
+ "%'")) {
             while (resultSet.next()) {
                 String name = resultSet.getString("name");
                 if (name.equalsIgnoreCase(configKey)) {
diff --git 
a/seatunnel-connectors-v2/connector-doris/src/main/java/org/apache/seatunnel/connectors/doris/catalog/DorisCatalog.java
 
b/seatunnel-connectors-v2/connector-doris/src/main/java/org/apache/seatunnel/connectors/doris/catalog/DorisCatalog.java
index e4d5aea5d6..146d364652 100644
--- 
a/seatunnel-connectors-v2/connector-doris/src/main/java/org/apache/seatunnel/connectors/doris/catalog/DorisCatalog.java
+++ 
b/seatunnel-connectors-v2/connector-doris/src/main/java/org/apache/seatunnel/connectors/doris/catalog/DorisCatalog.java
@@ -148,8 +148,9 @@ public class DorisCatalog implements Catalog {
     private String getDorisVersion() throws SQLException {
         String dorisVersion = null;
         try (PreparedStatement preparedStatement =
-                
conn.prepareStatement(DorisCatalogUtil.QUERY_DORIS_VERSION_QUERY)) {
-            ResultSet resultSet = preparedStatement.executeQuery();
+                        
conn.prepareStatement(DorisCatalogUtil.QUERY_DORIS_VERSION_QUERY);
+                ResultSet resultSet = preparedStatement.executeQuery()) {
+
             while (resultSet.next()) {
                 dorisVersion = resultSet.getString(2);
             }
@@ -180,8 +181,9 @@ public class DorisCatalog implements Catalog {
     public boolean databaseExists(String databaseName) throws CatalogException 
{
         try (PreparedStatement ps = 
conn.prepareStatement(DorisCatalogUtil.DATABASE_QUERY)) {
             ps.setString(1, databaseName);
-            ResultSet rs = ps.executeQuery();
-            return rs.next();
+            try (ResultSet rs = ps.executeQuery()) {
+                return rs.next();
+            }
         } catch (SQLException e) {
             throw new CatalogException("check database exists failed", e);
         }
@@ -190,8 +192,8 @@ public class DorisCatalog implements Catalog {
     @Override
     public List<String> listDatabases() throws CatalogException {
         List<String> databases = new ArrayList<>();
-        try (PreparedStatement ps = 
conn.prepareStatement(DorisCatalogUtil.ALL_DATABASES_QUERY)) {
-            ResultSet rs = ps.executeQuery();
+        try (PreparedStatement ps = 
conn.prepareStatement(DorisCatalogUtil.ALL_DATABASES_QUERY);
+                ResultSet rs = ps.executeQuery()) {
             while (rs.next()) {
                 String database = rs.getString(1);
                 databases.add(database);
@@ -210,10 +212,11 @@ public class DorisCatalog implements Catalog {
         try (PreparedStatement ps =
                 
conn.prepareStatement(DorisCatalogUtil.TABLES_QUERY_WITH_DATABASE_QUERY)) {
             ps.setString(1, databaseName);
-            ResultSet rs = ps.executeQuery();
-            while (rs.next()) {
-                String table = rs.getString(1);
-                tables.add(table);
+            try (ResultSet rs = ps.executeQuery()) {
+                while (rs.next()) {
+                    String table = rs.getString(1);
+                    tables.add(table);
+                }
             }
         } catch (SQLException e) {
             throw new CatalogException(
@@ -229,8 +232,9 @@ public class DorisCatalog implements Catalog {
                 
conn.prepareStatement(DorisCatalogUtil.TABLES_QUERY_WITH_IDENTIFIER_QUERY)) {
             ps.setString(1, tablePath.getDatabaseName());
             ps.setString(2, tablePath.getTableName());
-            ResultSet rs = ps.executeQuery();
-            return rs.next();
+            try (ResultSet rs = ps.executeQuery()) {
+                return rs.next();
+            }
         } catch (SQLException e) {
             throw new CatalogException(
                     String.format("check table [%s] exists failed", 
tablePath.getFullName()), e);
@@ -248,18 +252,19 @@ public class DorisCatalog implements Catalog {
         try (PreparedStatement ps = 
conn.prepareStatement(DorisCatalogUtil.TABLE_SCHEMA_QUERY)) {
             ps.setString(1, tablePath.getDatabaseName());
             ps.setString(2, tablePath.getTableName());
-            ResultSet rs = ps.executeQuery();
-            Map<String, String> options = connectorOptions();
-            buildTableSchemaWithErrorCheck(
-                    tablePath, rs, builder, options, Collections.emptyList());
-            return CatalogTable.of(
-                    TableIdentifier.of(
-                            catalogName, tablePath.getDatabaseName(), 
tablePath.getTableName()),
-                    builder.build(),
-                    options,
-                    Collections.emptyList(),
-                    "",
-                    catalogName);
+            try (ResultSet rs = ps.executeQuery()) {
+                Map<String, String> options = connectorOptions();
+                buildTableSchemaWithErrorCheck(
+                        tablePath, rs, builder, options, 
Collections.emptyList());
+                return CatalogTable.of(
+                        TableIdentifier.of(
+                                catalogName, tablePath.getDatabaseName(), 
tablePath.getTableName()),
+                        builder.build(),
+                        options,
+                        Collections.emptyList(),
+                        "",
+                        catalogName);
+            }
         } catch (SeaTunnelRuntimeException e) {
             throw e;
         } catch (Exception e) {
@@ -279,17 +284,18 @@ public class DorisCatalog implements Catalog {
         try (PreparedStatement ps = 
conn.prepareStatement(DorisCatalogUtil.TABLE_SCHEMA_QUERY)) {
             ps.setString(1, tablePath.getDatabaseName());
             ps.setString(2, tablePath.getTableName());
-            ResultSet rs = ps.executeQuery();
-            Map<String, String> options = connectorOptions();
-            buildTableSchemaWithErrorCheck(tablePath, rs, builder, options, 
fieldNames);
-            return CatalogTable.of(
-                    TableIdentifier.of(
-                            catalogName, tablePath.getDatabaseName(), 
tablePath.getTableName()),
-                    builder.build(),
-                    options,
-                    Collections.emptyList(),
-                    "",
-                    catalogName);
+            try (ResultSet rs = ps.executeQuery()) {
+                Map<String, String> options = connectorOptions();
+                buildTableSchemaWithErrorCheck(tablePath, rs, builder, 
options, fieldNames);
+                return CatalogTable.of(
+                        TableIdentifier.of(
+                                catalogName, tablePath.getDatabaseName(), 
tablePath.getTableName()),
+                        builder.build(),
+                        options,
+                        Collections.emptyList(),
+                        "",
+                        catalogName);
+            }
         } catch (SeaTunnelRuntimeException e) {
             throw e;
         } catch (Exception e) {
@@ -480,8 +486,8 @@ public class DorisCatalog implements Catalog {
     public boolean isExistsData(TablePath tablePath) {
         String tableName = tablePath.getFullName();
         String sql = String.format("select * from %s limit 1;", tableName);
-        try (PreparedStatement ps = conn.prepareStatement(sql)) {
-            ResultSet resultSet = ps.executeQuery();
+        try (PreparedStatement ps = conn.prepareStatement(sql);
+                ResultSet resultSet = ps.executeQuery()) {
             return resultSet.next();
         } catch (SQLException e) {
             throw new CatalogException(String.format("Failed executeSql error 
%s", sql), e);
diff --git 
a/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/catalog/AbstractJdbcCatalog.java
 
b/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/catalog/AbstractJdbcCatalog.java
index 210bb779e0..247a68c62d 100644
--- 
a/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/catalog/AbstractJdbcCatalog.java
+++ 
b/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/catalog/AbstractJdbcCatalog.java
@@ -570,9 +570,9 @@ public abstract class AbstractJdbcCatalog implements 
Catalog {
 
     protected List<String> queryString(String url, String sql, 
ResultSetConsumer<String> consumer)
             throws SQLException {
-        try (PreparedStatement ps = getConnection(url).prepareStatement(sql)) {
+        try (PreparedStatement ps = getConnection(url).prepareStatement(sql);
+                ResultSet rs = ps.executeQuery()) {
             List<String> result = new ArrayList<>();
-            ResultSet rs = ps.executeQuery();
             while (rs.next()) {
                 String value = consumer.apply(rs);
                 if (value != null) {
@@ -643,8 +643,9 @@ public abstract class AbstractJdbcCatalog implements 
Catalog {
         String dbUrl = getUrlFromDatabaseName(tablePath.getDatabaseName());
         Connection connection = getConnection(dbUrl);
         String sql = getExistDataSql(tablePath);
-        try (PreparedStatement ps = connection.prepareStatement(sql)) {
-            ResultSet resultSet = ps.executeQuery();
+        try (PreparedStatement ps = connection.prepareStatement(sql);
+                ResultSet resultSet = ps.executeQuery()) {
+
             return resultSet.next();
         } catch (SQLException e) {
             throw new CatalogException(String.format("Failed executeSql error 
%s", sql), e);
diff --git 
a/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/catalog/oceanbase/OceanBaseMySqlCatalog.java
 
b/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/catalog/oceanbase/OceanBaseMySqlCatalog.java
index b876e33cc8..ceeff2587f 100644
--- 
a/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/catalog/oceanbase/OceanBaseMySqlCatalog.java
+++ 
b/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/catalog/oceanbase/OceanBaseMySqlCatalog.java
@@ -208,10 +208,12 @@ public class OceanBaseMySqlCatalog extends 
AbstractJdbcCatalog {
     @Override
     public CatalogTable getTable(String sqlQuery) throws SQLException {
         Connection defaultConnection = getConnection(defaultUrl);
-        Statement statement = defaultConnection.createStatement();
-        ResultSetMetaData metaData = 
statement.executeQuery(sqlQuery).getMetaData();
-        return CatalogUtils.getCatalogTable(
-                metaData, new OceanBaseMySqlTypeMapper(typeConverter), 
sqlQuery);
+        try (Statement statement = defaultConnection.createStatement();
+                ResultSet resultSet = statement.executeQuery(sqlQuery)) {
+            ResultSetMetaData metaData = resultSet.getMetaData();
+            return CatalogUtils.getCatalogTable(
+                    metaData, new OceanBaseMySqlTypeMapper(typeConverter), 
sqlQuery);
+        }
     }
 
     @Override
diff --git 
a/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/catalog/utils/CatalogUtils.java
 
b/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/catalog/utils/CatalogUtils.java
index 6f8574401f..bb224c4624 100644
--- 
a/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/catalog/utils/CatalogUtils.java
+++ 
b/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/catalog/utils/CatalogUtils.java
@@ -106,22 +106,23 @@ public class CatalogUtils {
         // According to the Javadoc of 
java.sql.DatabaseMetaData#getPrimaryKeys,
         // the returned primary key columns are ordered by COLUMN_NAME, not by 
KEY_SEQ.
         // We need to sort them based on the KEY_SEQ value.
-        ResultSet rs =
+        // seq -> column name
+        List<Pair<Integer, String>> primaryKeyColumns = new ArrayList<>();
+        String pkName = null;
+        try (ResultSet rs =
                 metaData.getPrimaryKeys(
                         tablePath.getDatabaseName(),
                         tablePath.getSchemaName(),
-                        tablePath.getTableName());
+                        tablePath.getTableName())) {
 
-        // seq -> column name
-        List<Pair<Integer, String>> primaryKeyColumns = new ArrayList<>();
-        String pkName = null;
-        while (rs.next()) {
-            String columnName = rs.getString("COLUMN_NAME");
-            // all the PK_NAME should be the same
-            pkName = cleanKeyName(rs.getString("PK_NAME"));
-            int keySeq = rs.getInt("KEY_SEQ");
-            // KEY_SEQ is 1-based index
-            primaryKeyColumns.add(Pair.of(keySeq, columnName));
+            while (rs.next()) {
+                String columnName = rs.getString("COLUMN_NAME");
+                // all the PK_NAME should be the same
+                pkName = cleanKeyName(rs.getString("PK_NAME"));
+                int keySeq = rs.getInt("KEY_SEQ");
+                // KEY_SEQ is 1-based index
+                primaryKeyColumns.add(Pair.of(keySeq, columnName));
+            }
         }
         // initialize size
         List<String> pkFields =
@@ -139,45 +140,46 @@ public class CatalogUtils {
     public static List<ConstraintKey> getConstraintKeys(
             DatabaseMetaData metadata, TablePath tablePath) throws 
SQLException {
         // We set approximate to true to avoid querying the statistics table, 
which is slow.
-        ResultSet resultSet =
+        try (ResultSet resultSet =
                 metadata.getIndexInfo(
                         tablePath.getDatabaseName(),
                         tablePath.getSchemaName(),
                         tablePath.getTableName(),
                         false,
-                        true);
-        // index name -> index
-        Map<String, ConstraintKey> constraintKeyMap = new HashMap<>();
-        while (resultSet.next()) {
-            String columnName = resultSet.getString("COLUMN_NAME");
-            if (columnName == null) {
-                continue;
-            }
-            String indexName = cleanKeyName(resultSet.getString("INDEX_NAME"));
-            boolean noUnique = resultSet.getBoolean("NON_UNIQUE");
+                        true)) {
+            // index name -> index
+            Map<String, ConstraintKey> constraintKeyMap = new HashMap<>();
+            while (resultSet.next()) {
+                String columnName = resultSet.getString("COLUMN_NAME");
+                if (columnName == null) {
+                    continue;
+                }
+                String indexName = 
cleanKeyName(resultSet.getString("INDEX_NAME"));
+                boolean noUnique = resultSet.getBoolean("NON_UNIQUE");
 
-            ConstraintKey constraintKey =
-                    constraintKeyMap.computeIfAbsent(
-                            indexName,
-                            s -> {
-                                ConstraintKey.ConstraintType constraintType =
-                                        ConstraintKey.ConstraintType.INDEX_KEY;
-                                if (!noUnique) {
-                                    constraintType = 
ConstraintKey.ConstraintType.UNIQUE_KEY;
-                                }
-                                return ConstraintKey.of(
-                                        constraintType, indexName, new 
ArrayList<>());
-                            });
+                ConstraintKey constraintKey =
+                        constraintKeyMap.computeIfAbsent(
+                                indexName,
+                                s -> {
+                                    ConstraintKey.ConstraintType 
constraintType =
+                                            
ConstraintKey.ConstraintType.INDEX_KEY;
+                                    if (!noUnique) {
+                                        constraintType = 
ConstraintKey.ConstraintType.UNIQUE_KEY;
+                                    }
+                                    return ConstraintKey.of(
+                                            constraintType, indexName, new 
ArrayList<>());
+                                });
 
-            ConstraintKey.ColumnSortType sortType =
-                    "A".equals(resultSet.getString("ASC_OR_DESC"))
-                            ? ConstraintKey.ColumnSortType.ASC
-                            : ConstraintKey.ColumnSortType.DESC;
-            ConstraintKey.ConstraintKeyColumn constraintKeyColumn =
-                    new ConstraintKey.ConstraintKeyColumn(columnName, 
sortType);
-            constraintKey.getColumnNames().add(constraintKeyColumn);
+                ConstraintKey.ColumnSortType sortType =
+                        "A".equals(resultSet.getString("ASC_OR_DESC"))
+                                ? ConstraintKey.ColumnSortType.ASC
+                                : ConstraintKey.ColumnSortType.DESC;
+                ConstraintKey.ConstraintKeyColumn constraintKeyColumn =
+                        new ConstraintKey.ConstraintKeyColumn(columnName, 
sortType);
+                constraintKey.getColumnNames().add(constraintKeyColumn);
+            }
+            return new ArrayList<>(constraintKeyMap.values());
         }
-        return new ArrayList<>(constraintKeyMap.values());
     }
 
     private static String cleanKeyName(String keyName) {
diff --git 
a/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/catalog/utils/JdbcColumnConverter.java
 
b/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/catalog/utils/JdbcColumnConverter.java
index 87b0f54d20..664141b450 100644
--- 
a/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/catalog/utils/JdbcColumnConverter.java
+++ 
b/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/catalog/utils/JdbcColumnConverter.java
@@ -72,25 +72,33 @@ public class JdbcColumnConverter {
 
     public static List<Column> convert(DatabaseMetaData metadata, TablePath 
tablePath)
             throws SQLException {
-        ResultSet columnsResultSet =
+        List<Column> columns = new ArrayList<>();
+
+        try (ResultSet columnsResultSet =
                 metadata.getColumns(
                         tablePath.getDatabaseName(),
                         tablePath.getSchemaName(),
                         tablePath.getTableName(),
-                        null);
+                        null)) {
 
-        List<Column> columns = new ArrayList<>();
-        while (columnsResultSet.next()) {
-            String columnName = columnsResultSet.getString("COLUMN_NAME");
-            int jdbcType = columnsResultSet.getInt("DATA_TYPE");
-            String nativeType = columnsResultSet.getString("TYPE_NAME");
-            int columnSize = columnsResultSet.getInt("COLUMN_SIZE");
-            int decimalDigits = columnsResultSet.getInt("DECIMAL_DIGITS");
-            int nullable = columnsResultSet.getInt("NULLABLE");
+            while (columnsResultSet.next()) {
+                String columnName = columnsResultSet.getString("COLUMN_NAME");
+                int jdbcType = columnsResultSet.getInt("DATA_TYPE");
+                String nativeType = columnsResultSet.getString("TYPE_NAME");
+                int columnSize = columnsResultSet.getInt("COLUMN_SIZE");
+                int decimalDigits = columnsResultSet.getInt("DECIMAL_DIGITS");
+                int nullable = columnsResultSet.getInt("NULLABLE");
 
-            Column column =
-                    convert(columnName, jdbcType, nativeType, nullable, 
columnSize, decimalDigits);
-            columns.add(column);
+                Column column =
+                        convert(
+                                columnName,
+                                jdbcType,
+                                nativeType,
+                                nullable,
+                                columnSize,
+                                decimalDigits);
+                columns.add(column);
+            }
         }
         return columns;
     }
diff --git 
a/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/internal/dialect/hive/HiveDialect.java
 
b/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/internal/dialect/hive/HiveDialect.java
index 08e68632f7..adee0ba38f 100644
--- 
a/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/internal/dialect/hive/HiveDialect.java
+++ 
b/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/internal/dialect/hive/HiveDialect.java
@@ -25,6 +25,8 @@ import 
org.apache.seatunnel.connectors.seatunnel.jdbc.internal.dialect.JdbcDiale
 import 
org.apache.seatunnel.connectors.seatunnel.jdbc.internal.dialect.JdbcDialectTypeMapper;
 
 import java.sql.Connection;
+import java.sql.PreparedStatement;
+import java.sql.ResultSet;
 import java.sql.ResultSetMetaData;
 import java.sql.SQLException;
 import java.util.Optional;
@@ -55,7 +57,10 @@ public class HiveDialect implements JdbcDialect {
     @Override
     public ResultSetMetaData getResultSetMetaData(Connection conn, String 
query)
             throws SQLException {
-        return conn.prepareStatement(query).executeQuery().getMetaData();
+        try (PreparedStatement preparedStatement = 
conn.prepareStatement(query);
+                ResultSet resultSet = preparedStatement.executeQuery()) {
+            return resultSet.getMetaData();
+        }
     }
 
     @Override
diff --git 
a/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/internal/dialect/iris/IrisDialect.java
 
b/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/internal/dialect/iris/IrisDialect.java
index 5be550cdef..e64974081d 100644
--- 
a/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/internal/dialect/iris/IrisDialect.java
+++ 
b/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/internal/dialect/iris/IrisDialect.java
@@ -193,7 +193,9 @@ public class IrisDialect implements JdbcDialect {
     @Override
     public ResultSetMetaData getResultSetMetaData(Connection conn, String 
query)
             throws SQLException {
-        PreparedStatement ps = conn.prepareStatement(query);
-        return ps.executeQuery().getMetaData();
+        try (PreparedStatement ps = conn.prepareStatement(query);
+                ResultSet resultSet = ps.executeQuery()) {
+            return resultSet.getMetaData();
+        }
     }
 }
diff --git 
a/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/internal/dialect/tablestore/TablestoreDialect.java
 
b/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/internal/dialect/tablestore/TablestoreDialect.java
index 7a1edbeee7..9506ffd997 100644
--- 
a/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/internal/dialect/tablestore/TablestoreDialect.java
+++ 
b/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/internal/dialect/tablestore/TablestoreDialect.java
@@ -24,6 +24,7 @@ import 
org.apache.seatunnel.connectors.seatunnel.jdbc.internal.dialect.JdbcDiale
 
 import java.sql.Connection;
 import java.sql.PreparedStatement;
+import java.sql.ResultSet;
 import java.sql.ResultSetMetaData;
 import java.sql.SQLException;
 import java.util.Optional;
@@ -61,6 +62,9 @@ public class TablestoreDialect implements JdbcDialect {
     @Override
     public ResultSetMetaData getResultSetMetaData(Connection conn, String 
query)
             throws SQLException {
-        return conn.prepareStatement(query).executeQuery().getMetaData();
+        try (PreparedStatement preparedStatement = 
conn.prepareStatement(query);
+                ResultSet resultSet = preparedStatement.executeQuery()) {
+            return resultSet.getMetaData();
+        }
     }
 }
diff --git 
a/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/internal/dialect/xugu/XuguDialect.java
 
b/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/internal/dialect/xugu/XuguDialect.java
index 7340b099b8..adf2cf2190 100644
--- 
a/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/internal/dialect/xugu/XuguDialect.java
+++ 
b/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/internal/dialect/xugu/XuguDialect.java
@@ -225,7 +225,9 @@ public class XuguDialect implements JdbcDialect {
     @Override
     public ResultSetMetaData getResultSetMetaData(Connection conn, String 
query)
             throws SQLException {
-        PreparedStatement ps = conn.prepareStatement(query);
-        return ps.executeQuery().getMetaData();
+        try (PreparedStatement ps = conn.prepareStatement(query);
+                ResultSet resultSet = ps.executeQuery()) {
+            return resultSet.getMetaData();
+        }
     }
 }
diff --git 
a/seatunnel-connectors-v2/connector-starrocks/src/main/java/org/apache/seatunnel/connectors/seatunnel/starrocks/catalog/StarRocksCatalog.java
 
b/seatunnel-connectors-v2/connector-starrocks/src/main/java/org/apache/seatunnel/connectors/seatunnel/starrocks/catalog/StarRocksCatalog.java
index 8a14b08efe..e71aae291a 100644
--- 
a/seatunnel-connectors-v2/connector-starrocks/src/main/java/org/apache/seatunnel/connectors/seatunnel/starrocks/catalog/StarRocksCatalog.java
+++ 
b/seatunnel-connectors-v2/connector-starrocks/src/main/java/org/apache/seatunnel/connectors/seatunnel/starrocks/catalog/StarRocksCatalog.java
@@ -56,6 +56,7 @@ import java.sql.PreparedStatement;
 import java.sql.ResultSet;
 import java.sql.ResultSetMetaData;
 import java.sql.SQLException;
+import java.sql.Statement;
 import java.util.ArrayList;
 import java.util.Collections;
 import java.util.HashMap;
@@ -107,12 +108,10 @@ public class StarRocksCatalog implements Catalog {
 
     @Override
     public List<String> listDatabases() throws CatalogException {
-        try (Connection conn = DriverManager.getConnection(defaultUrl, 
username, pwd)) {
-
-            PreparedStatement ps = conn.prepareStatement("SHOW DATABASES;");
-
+        try (Connection conn = DriverManager.getConnection(defaultUrl, 
username, pwd);
+                PreparedStatement ps = conn.prepareStatement("SHOW 
DATABASES;");
+                ResultSet rs = ps.executeQuery()) {
             List<String> databases = new ArrayList<>();
-            ResultSet rs = ps.executeQuery();
 
             while (rs.next()) {
                 String databaseName = rs.getString(1);
@@ -136,11 +135,10 @@ public class StarRocksCatalog implements Catalog {
         }
 
         try (Connection conn =
-                DriverManager.getConnection(
-                        urlInfo.getUrlWithDatabase(databaseName), username, 
pwd)) {
-            PreparedStatement ps = conn.prepareStatement("SHOW TABLES;");
-
-            ResultSet rs = ps.executeQuery();
+                        DriverManager.getConnection(
+                                urlInfo.getUrlWithDatabase(databaseName), 
username, pwd);
+                PreparedStatement ps = conn.prepareStatement("SHOW TABLES;");
+                ResultSet rs = ps.executeQuery()) {
 
             List<String> tables = new ArrayList<>();
 
@@ -259,9 +257,10 @@ public class StarRocksCatalog implements Catalog {
     }
 
     public boolean isExistsData(TablePath tablePath) {
-        try (Connection connection = DriverManager.getConnection(defaultUrl, 
username, pwd)) {
-            String sql = String.format("select * from %s limit 1", 
tablePath.getFullName());
-            ResultSet resultSet = 
connection.createStatement().executeQuery(sql);
+        String sql = String.format("select * from %s limit 1", 
tablePath.getFullName());
+        try (Connection connection = DriverManager.getConnection(defaultUrl, 
username, pwd);
+                Statement statement = connection.createStatement();
+                ResultSet resultSet = statement.executeQuery(sql)) {
             if (resultSet == null) {
                 return false;
             }
@@ -455,13 +454,13 @@ public class StarRocksCatalog implements Catalog {
     protected Optional<PrimaryKey> getPrimaryKey(String schema, String table) 
throws SQLException {
 
         List<String> pkFields = new ArrayList<>();
-        try (Connection conn = DriverManager.getConnection(defaultUrl, 
username, pwd)) {
-            ResultSet rs =
-                    conn.createStatement()
-                            .executeQuery(
-                                    String.format(
-                                            "SELECT COLUMN_NAME FROM 
information_schema.columns where TABLE_SCHEMA = '%s' AND TABLE_NAME = '%s' AND 
COLUMN_KEY = 'PRI' ORDER BY ORDINAL_POSITION",
-                                            schema, table));
+        try (Connection conn = DriverManager.getConnection(defaultUrl, 
username, pwd);
+                ResultSet rs =
+                        conn.createStatement()
+                                .executeQuery(
+                                        String.format(
+                                                "SELECT COLUMN_NAME FROM 
information_schema.columns where TABLE_SCHEMA = '%s' AND TABLE_NAME = '%s' AND 
COLUMN_KEY = 'PRI' ORDER BY ORDINAL_POSITION",
+                                                schema, table))) {
             while (rs.next()) {
                 String columnName = rs.getString("COLUMN_NAME");
                 pkFields.add(columnName);
diff --git 
a/seatunnel-connectors-v2/connector-tdengine/src/main/java/org/apache/seatunnel/connectors/seatunnel/tdengine/sink/TDengineSinkWriter.java
 
b/seatunnel-connectors-v2/connector-tdengine/src/main/java/org/apache/seatunnel/connectors/seatunnel/tdengine/sink/TDengineSinkWriter.java
index 5c7b13c550..ed05e64937 100644
--- 
a/seatunnel-connectors-v2/connector-tdengine/src/main/java/org/apache/seatunnel/connectors/seatunnel/tdengine/sink/TDengineSinkWriter.java
+++ 
b/seatunnel-connectors-v2/connector-tdengine/src/main/java/org/apache/seatunnel/connectors/seatunnel/tdengine/sink/TDengineSinkWriter.java
@@ -71,10 +71,11 @@ public class TDengineSinkWriter extends 
AbstractSinkWriter<SeaTunnelRow, Void> {
         // check td driver whether exist and if not, try to register
         checkDriverExist(jdbcUrl);
         conn = DriverManager.getConnection(jdbcUrl);
-        try (Statement statement = conn.createStatement()) {
-            final ResultSet metaResultSet =
-                    statement.executeQuery(
-                            "desc " + config.getDatabase() + "." + 
config.getStable());
+        try (Statement statement = conn.createStatement();
+                final ResultSet metaResultSet =
+                        statement.executeQuery(
+                                "desc " + config.getDatabase() + "." + 
config.getStable())) {
+
             while (metaResultSet.next()) {
                 if (StringUtils.equals("TAG", 
metaResultSet.getString("note"))) {
                     tagsNum++;
diff --git 
a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-cdc-mongodb-e2e/src/test/java/mongodb/MongodbCDCIT.java
 
b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-cdc-mongodb-e2e/src/test/java/mongodb/MongodbCDCIT.java
index 01a4c0a0f5..3789731354 100644
--- 
a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-cdc-mongodb-e2e/src/test/java/mongodb/MongodbCDCIT.java
+++ 
b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-cdc-mongodb-e2e/src/test/java/mongodb/MongodbCDCIT.java
@@ -233,8 +233,9 @@ public class MongodbCDCIT extends TestSuiteBase implements 
TestResource {
     }
 
     private List<List<Object>> querySql() {
-        try (Connection connection = getJdbcConnection()) {
-            ResultSet resultSet = 
connection.createStatement().executeQuery(MongodbCDCIT.SINK_SQL);
+        try (Connection connection = getJdbcConnection();
+                ResultSet resultSet =
+                        
connection.createStatement().executeQuery(MongodbCDCIT.SINK_SQL)) {
             List<List<Object>> result = new ArrayList<>();
             int columnCount = resultSet.getMetaData().getColumnCount();
             while (resultSet.next()) {
diff --git 
a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-cdc-mysql-e2e/src/test/java/org/apache/seatunnel/connectors/seatunnel/cdc/mysql/MysqlCDCIT.java
 
b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-cdc-mysql-e2e/src/test/java/org/apache/seatunnel/connectors/seatunnel/cdc/mysql/MysqlCDCIT.java
index 7fab60f9fc..a98a5cd4d2 100644
--- 
a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-cdc-mysql-e2e/src/test/java/org/apache/seatunnel/connectors/seatunnel/cdc/mysql/MysqlCDCIT.java
+++ 
b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-cdc-mysql-e2e/src/test/java/org/apache/seatunnel/connectors/seatunnel/cdc/mysql/MysqlCDCIT.java
@@ -44,6 +44,7 @@ import java.sql.Connection;
 import java.sql.DriverManager;
 import java.sql.ResultSet;
 import java.sql.SQLException;
+import java.sql.Statement;
 import java.util.ArrayList;
 import java.util.List;
 import java.util.concurrent.CompletableFuture;
@@ -500,8 +501,9 @@ public class MysqlCDCIT extends TestSuiteBase implements 
TestResource {
     }
 
     private List<List<Object>> query(String sql) {
-        try (Connection connection = getJdbcConnection()) {
-            ResultSet resultSet = 
connection.createStatement().executeQuery(sql);
+        try (Connection connection = getJdbcConnection();
+                Statement statement = connection.createStatement();
+                ResultSet resultSet = statement.executeQuery(sql)) {
             List<List<Object>> result = new ArrayList<>();
             int columnCount = resultSet.getMetaData().getColumnCount();
             while (resultSet.next()) {
diff --git 
a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-cdc-sqlserver-e2e/src/test/java/org/apache/seatunnel/e2e/connector/cdc/sqlserver/SqlServerCDCIT.java
 
b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-cdc-sqlserver-e2e/src/test/java/org/apache/seatunnel/e2e/connector/cdc/sqlserver/SqlServerCDCIT.java
index 1216c69645..8a1814e6ae 100644
--- 
a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-cdc-sqlserver-e2e/src/test/java/org/apache/seatunnel/e2e/connector/cdc/sqlserver/SqlServerCDCIT.java
+++ 
b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-cdc-sqlserver-e2e/src/test/java/org/apache/seatunnel/e2e/connector/cdc/sqlserver/SqlServerCDCIT.java
@@ -375,8 +375,9 @@ public class SqlServerCDCIT extends TestSuiteBase 
implements TestResource {
     }
 
     private List<List<Object>> querySql(String sql) {
-        try (Connection connection = getJdbcConnection()) {
-            ResultSet resultSet = 
connection.createStatement().executeQuery(sql);
+        try (Connection connection = getJdbcConnection();
+                Statement statement = connection.createStatement();
+                ResultSet resultSet = statement.executeQuery(sql)) {
             List<List<Object>> result = new ArrayList<>();
             int columnCount = resultSet.getMetaData().getColumnCount();
             while (resultSet.next()) {
diff --git 
a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-clickhouse-e2e/src/test/java/org/apache/seatunnel/connectors/seatunnel/clickhouse/ClickhouseIT.java
 
b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-clickhouse-e2e/src/test/java/org/apache/seatunnel/connectors/seatunnel/clickhouse/ClickhouseIT.java
index c0a4254739..66ee281740 100644
--- 
a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-clickhouse-e2e/src/test/java/org/apache/seatunnel/connectors/seatunnel/clickhouse/ClickhouseIT.java
+++ 
b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-clickhouse-e2e/src/test/java/org/apache/seatunnel/connectors/seatunnel/clickhouse/ClickhouseIT.java
@@ -357,39 +357,40 @@ public class ClickhouseIT extends TestSuiteBase 
implements TestResource {
         List<String> columnList =
                 Arrays.stream(generateTestDataSet().getKey().getFieldNames())
                         .collect(Collectors.toList());
-        Statement sourceStatement = connection.createStatement();
-        Statement sinkStatement = connection.createStatement();
-        ResultSet sourceResultSet = sourceStatement.executeQuery(sourceSql);
-        ResultSet sinkResultSet = sinkStatement.executeQuery(sinkSql);
-        Assertions.assertEquals(
-                sourceResultSet.getMetaData().getColumnCount(),
-                sinkResultSet.getMetaData().getColumnCount());
-        while (sourceResultSet.next()) {
-            if (sinkResultSet.next()) {
-                for (String column : columnList) {
-                    Object source = sourceResultSet.getObject(column);
-                    Object sink = sinkResultSet.getObject(column);
-                    if (!Objects.deepEquals(source, sink)) {
-                        InputStream sourceAsciiStream = 
sourceResultSet.getBinaryStream(column);
-                        InputStream sinkAsciiStream = 
sinkResultSet.getBinaryStream(column);
-                        String sourceValue =
-                                IOUtils.toString(sourceAsciiStream, 
StandardCharsets.UTF_8);
-                        String sinkValue =
-                                IOUtils.toString(sinkAsciiStream, 
StandardCharsets.UTF_8);
-                        Assertions.assertEquals(sourceValue, sinkValue);
+        try (Statement sourceStatement = connection.createStatement();
+                Statement sinkStatement = connection.createStatement();
+                ResultSet sourceResultSet = 
sourceStatement.executeQuery(sourceSql);
+                ResultSet sinkResultSet = sinkStatement.executeQuery(sinkSql)) 
{
+            Assertions.assertEquals(
+                    sourceResultSet.getMetaData().getColumnCount(),
+                    sinkResultSet.getMetaData().getColumnCount());
+            while (sourceResultSet.next()) {
+                if (sinkResultSet.next()) {
+                    for (String column : columnList) {
+                        Object source = sourceResultSet.getObject(column);
+                        Object sink = sinkResultSet.getObject(column);
+                        if (!Objects.deepEquals(source, sink)) {
+                            InputStream sourceAsciiStream = 
sourceResultSet.getBinaryStream(column);
+                            InputStream sinkAsciiStream = 
sinkResultSet.getBinaryStream(column);
+                            String sourceValue =
+                                    IOUtils.toString(sourceAsciiStream, 
StandardCharsets.UTF_8);
+                            String sinkValue =
+                                    IOUtils.toString(sinkAsciiStream, 
StandardCharsets.UTF_8);
+                            Assertions.assertEquals(sourceValue, sinkValue);
+                        }
+                        Assertions.assertTrue(true);
                     }
-                    Assertions.assertTrue(true);
                 }
             }
+            String columns = String.join(",", 
generateTestDataSet().getKey().getFieldNames());
+            Assertions.assertTrue(
+                    compare(String.format(CONFIG.getString(COMPARE_SQL), 
columns, columns)));
         }
-        String columns = String.join(",", 
generateTestDataSet().getKey().getFieldNames());
-        Assertions.assertTrue(
-                compare(String.format(CONFIG.getString(COMPARE_SQL), columns, 
columns)));
     }
 
     private Boolean compare(String sql) {
-        try (Statement statement = connection.createStatement()) {
-            ResultSet resultSet = statement.executeQuery(sql);
+        try (Statement statement = connection.createStatement();
+                ResultSet resultSet = statement.executeQuery(sql)) {
             return !resultSet.next();
         } catch (SQLException e) {
             throw new RuntimeException("result compare error", e);
@@ -397,9 +398,9 @@ public class ClickhouseIT extends TestSuiteBase implements 
TestResource {
     }
 
     private void assertHasData(String table) {
-        try (Statement statement = connection.createStatement()) {
-            String sql = String.format("select * from %s.%s limit 1", 
DATABASE, table);
-            ResultSet source = statement.executeQuery(sql);
+        String sql = String.format("select * from %s.%s limit 1", DATABASE, 
table);
+        try (Statement statement = connection.createStatement();
+                ResultSet source = statement.executeQuery(sql); ) {
             Assertions.assertTrue(source.next());
         } catch (SQLException e) {
             throw new RuntimeException("test clickhouse server image error", 
e);
diff --git 
a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-clickhouse-e2e/src/test/java/org/apache/seatunnel/connectors/seatunnel/clickhouse/ClickhouseSinkCDCChangelogIT.java
 
b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-clickhouse-e2e/src/test/java/org/apache/seatunnel/connectors/seatunnel/clickhouse/ClickhouseSinkCDCChangelogIT.java
index 3d46ac8c55..5d9c1c848b 100644
--- 
a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-clickhouse-e2e/src/test/java/org/apache/seatunnel/connectors/seatunnel/clickhouse/ClickhouseSinkCDCChangelogIT.java
+++ 
b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-clickhouse-e2e/src/test/java/org/apache/seatunnel/connectors/seatunnel/clickhouse/ClickhouseSinkCDCChangelogIT.java
@@ -189,10 +189,10 @@ public class ClickhouseSinkCDCChangelogIT extends 
TestSuiteBase implements TestR
 
     private void checkSinkTableRows() throws SQLException {
         Set<List<Object>> actual = new HashSet<>();
-        try (Statement statement = connection.createStatement()) {
-            ResultSet resultSet =
-                    statement.executeQuery(
-                            String.format("select * from %s.%s", DATABASE, 
SINK_TABLE));
+        try (Statement statement = connection.createStatement();
+                ResultSet resultSet =
+                        statement.executeQuery(
+                                String.format("select * from %s.%s", DATABASE, 
SINK_TABLE))) {
             while (resultSet.next()) {
                 List<Object> row =
                         Arrays.asList(
diff --git 
a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-doris-e2e/src/test/java/org/apache/seatunnel/e2e/connector/doris/AbstractDorisIT.java
 
b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-doris-e2e/src/test/java/org/apache/seatunnel/e2e/connector/doris/AbstractDorisIT.java
index 458a900b4b..8392f9ae33 100644
--- 
a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-doris-e2e/src/test/java/org/apache/seatunnel/e2e/connector/doris/AbstractDorisIT.java
+++ 
b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-doris-e2e/src/test/java/org/apache/seatunnel/e2e/connector/doris/AbstractDorisIT.java
@@ -94,8 +94,11 @@ public abstract class AbstractDorisIT extends TestSuiteBase 
implements TestResou
                 DriverManager.getConnection(String.format(URL, 
container.getHost()), props);
         try (Statement statement = jdbcConnection.createStatement()) {
             statement.execute(SET_SQL);
-            ResultSet resultSet;
+            ResultSet resultSet = null;
             do {
+                if (resultSet != null) {
+                    resultSet.close();
+                }
                 resultSet = statement.executeQuery(SHOW_BE);
             } while (!isBeReady(resultSet, Duration.ofSeconds(1L)));
         }
diff --git 
a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-doris-e2e/src/test/java/org/apache/seatunnel/e2e/connector/doris/DorisCDCSinkIT.java
 
b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-doris-e2e/src/test/java/org/apache/seatunnel/e2e/connector/doris/DorisCDCSinkIT.java
index 33108b8b8e..7fa699d998 100644
--- 
a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-doris-e2e/src/test/java/org/apache/seatunnel/e2e/connector/doris/DorisCDCSinkIT.java
+++ 
b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-doris-e2e/src/test/java/org/apache/seatunnel/e2e/connector/doris/DorisCDCSinkIT.java
@@ -155,8 +155,8 @@ public class DorisCDCSinkIT extends AbstractDorisIT {
                 .untilAsserted(
                         () -> {
                             Set<List<Object>> actual = new HashSet<>();
-                            try (Statement sinkStatement = 
jdbcConnection.createStatement()) {
-                                ResultSet sinkResultSet = 
sinkStatement.executeQuery(sinkSql);
+                            try (Statement sinkStatement = 
jdbcConnection.createStatement();
+                                    ResultSet sinkResultSet = 
sinkStatement.executeQuery(sinkSql)) {
                                 while (sinkResultSet.next()) {
                                     List<Object> row =
                                             Arrays.asList(
@@ -178,8 +178,8 @@ public class DorisCDCSinkIT extends AbstractDorisIT {
                 .untilAsserted(
                         () -> {
                             Set<List<Object>> actual = new HashSet<>();
-                            try (Statement sinkStatement = 
jdbcConnection.createStatement()) {
-                                ResultSet sinkResultSet = 
sinkStatement.executeQuery(sinkSql);
+                            try (Statement sinkStatement = 
jdbcConnection.createStatement();
+                                    ResultSet sinkResultSet = 
sinkStatement.executeQuery(sinkSql)) {
                                 while (sinkResultSet.next()) {
                                     List<Object> row =
                                             Arrays.asList(
diff --git 
a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-doris-e2e/src/test/java/org/apache/seatunnel/e2e/connector/doris/DorisIT.java
 
b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-doris-e2e/src/test/java/org/apache/seatunnel/e2e/connector/doris/DorisIT.java
index 6b7a3a7f48..e9b81100de 100644
--- 
a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-doris-e2e/src/test/java/org/apache/seatunnel/e2e/connector/doris/DorisIT.java
+++ 
b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-doris-e2e/src/test/java/org/apache/seatunnel/e2e/connector/doris/DorisIT.java
@@ -203,13 +203,14 @@ public class DorisIT extends AbstractDorisIT {
                     
conn.prepareStatement(DorisCatalogUtil.TABLE_SCHEMA_QUERY)) {
                 ps.setString(1, sinkDB);
                 ps.setString(2, DUPLICATE_TABLE);
-                ResultSet resultSet = ps.executeQuery();
-                while (resultSet.next()) {
-                    String columnName = resultSet.getString("COLUMN_NAME");
-                    String columnType = resultSet.getString("COLUMN_TYPE");
-                    Assertions.assertEquals(
-                            
checkColumnTypeMap.get(columnName).toUpperCase(Locale.ROOT),
-                            columnType.toUpperCase(Locale.ROOT));
+                try (ResultSet resultSet = ps.executeQuery()) {
+                    while (resultSet.next()) {
+                        String columnName = resultSet.getString("COLUMN_NAME");
+                        String columnType = resultSet.getString("COLUMN_TYPE");
+                        Assertions.assertEquals(
+                                
checkColumnTypeMap.get(columnName).toUpperCase(Locale.ROOT),
+                                columnType.toUpperCase(Locale.ROOT));
+                    }
                 }
             }
 
diff --git 
a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-jdbc-e2e/connector-jdbc-e2e-part-1/src/test/java/org/apache/seatunnel/connectors/seatunnel/jdbc/JdbcPostgresIdentifierIT.java
 
b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-jdbc-e2e/connector-jdbc-e2e-part-1/src/test/java/org/apache/seatunnel/connectors/seatunnel/jdbc/JdbcPostgresIdentifierIT.java
index a7094044aa..c4037ecce6 100644
--- 
a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-jdbc-e2e/connector-jdbc-e2e-part-1/src/test/java/org/apache/seatunnel/connectors/seatunnel/jdbc/JdbcPostgresIdentifierIT.java
+++ 
b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-jdbc-e2e/connector-jdbc-e2e-part-1/src/test/java/org/apache/seatunnel/connectors/seatunnel/jdbc/JdbcPostgresIdentifierIT.java
@@ -345,8 +345,9 @@ public class JdbcPostgresIdentifierIT extends TestSuiteBase 
implements TestResou
     }
 
     private List<List<Object>> querySql(String sql) {
-        try (Connection connection = getJdbcConnection()) {
-            ResultSet resultSet = 
connection.createStatement().executeQuery(sql);
+        try (Connection connection = getJdbcConnection();
+                Statement statement = connection.createStatement();
+                ResultSet resultSet = statement.executeQuery(sql)) {
             List<List<Object>> result = new ArrayList<>();
             int columnCount = resultSet.getMetaData().getColumnCount();
             while (resultSet.next()) {
diff --git 
a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-jdbc-e2e/connector-jdbc-e2e-part-2/src/test/java/org/apache/seatunnel/connectors/seatunnel/jdbc/JdbcOceanBaseITBase.java
 
b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-jdbc-e2e/connector-jdbc-e2e-part-2/src/test/java/org/apache/seatunnel/connectors/seatunnel/jdbc/JdbcOceanBaseITBase.java
index 6cdc38780a..844c2fc00c 100644
--- 
a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-jdbc-e2e/connector-jdbc-e2e-part-2/src/test/java/org/apache/seatunnel/connectors/seatunnel/jdbc/JdbcOceanBaseITBase.java
+++ 
b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-jdbc-e2e/connector-jdbc-e2e-part-2/src/test/java/org/apache/seatunnel/connectors/seatunnel/jdbc/JdbcOceanBaseITBase.java
@@ -51,11 +51,10 @@ public abstract class JdbcOceanBaseITBase extends 
AbstractJdbcIT {
                 String.format("select * from %s order by 1", 
getFullTableName(OCEANBASE_SOURCE));
         String sinkSql =
                 String.format("select * from %s order by 1", 
getFullTableName(OCEANBASE_SINK));
-        try {
-            Statement sourceStatement = connection.createStatement();
-            Statement sinkStatement = connection.createStatement();
-            ResultSet sourceResultSet = 
sourceStatement.executeQuery(sourceSql);
-            ResultSet sinkResultSet = sinkStatement.executeQuery(sinkSql);
+        try (Statement sourceStatement = connection.createStatement();
+                Statement sinkStatement = connection.createStatement();
+                ResultSet sourceResultSet = 
sourceStatement.executeQuery(sourceSql);
+                ResultSet sinkResultSet = sinkStatement.executeQuery(sinkSql)) 
{
             Assertions.assertEquals(
                     sourceResultSet.getMetaData().getColumnCount(),
                     sinkResultSet.getMetaData().getColumnCount());
diff --git 
a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-jdbc-e2e/connector-jdbc-e2e-part-3/src/test/java/org/apache/seatunnel/connectors/seatunnel/jdbc/JdbcPostgresIT.java
 
b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-jdbc-e2e/connector-jdbc-e2e-part-3/src/test/java/org/apache/seatunnel/connectors/seatunnel/jdbc/JdbcPostgresIT.java
index 6993b99336..d357d238b7 100644
--- 
a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-jdbc-e2e/connector-jdbc-e2e-part-3/src/test/java/org/apache/seatunnel/connectors/seatunnel/jdbc/JdbcPostgresIT.java
+++ 
b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-jdbc-e2e/connector-jdbc-e2e-part-3/src/test/java/org/apache/seatunnel/connectors/seatunnel/jdbc/JdbcPostgresIT.java
@@ -491,8 +491,8 @@ public class JdbcPostgresIT extends TestSuiteBase 
implements TestResource {
     }
 
     private List<List<Object>> querySql(String sql) {
-        try (Connection connection = getJdbcConnection()) {
-            ResultSet resultSet = 
connection.createStatement().executeQuery(sql);
+        try (Connection connection = getJdbcConnection();
+                ResultSet resultSet = 
connection.createStatement().executeQuery(sql)) {
             List<List<Object>> result = new ArrayList<>();
             int columnCount = resultSet.getMetaData().getColumnCount();
             while (resultSet.next()) {
diff --git 
a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-jdbc-e2e/connector-jdbc-e2e-part-3/src/test/java/org/apache/seatunnel/connectors/seatunnel/jdbc/JdbcSinkCDCChangelogIT.java
 
b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-jdbc-e2e/connector-jdbc-e2e-part-3/src/test/java/org/apache/seatunnel/connectors/seatunnel/jdbc/JdbcSinkCDCChangelogIT.java
index dd812efb12..cea5099e3b 100644
--- 
a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-jdbc-e2e/connector-jdbc-e2e-part-3/src/test/java/org/apache/seatunnel/connectors/seatunnel/jdbc/JdbcSinkCDCChangelogIT.java
+++ 
b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-jdbc-e2e/connector-jdbc-e2e-part-3/src/test/java/org/apache/seatunnel/connectors/seatunnel/jdbc/JdbcSinkCDCChangelogIT.java
@@ -110,8 +110,8 @@ public class JdbcSinkCDCChangelogIT extends TestSuiteBase 
implements TestResourc
                         postgreSQLContainer.getJdbcUrl(),
                         postgreSQLContainer.getUsername(),
                         postgreSQLContainer.getPassword())) {
-            try (Statement statement = connection.createStatement()) {
-                ResultSet resultSet = statement.executeQuery("select * from 
sink");
+            try (Statement statement = connection.createStatement();
+                    ResultSet resultSet = statement.executeQuery("select * 
from sink")) {
                 while (resultSet.next()) {
                     List<Object> row =
                             Arrays.asList(
diff --git 
a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-jdbc-e2e/connector-jdbc-e2e-part-4/src/test/java/org/apache/seatunnel/connectors/seatunnel/jdbc/JdbcMySqlCreateTableIT.java
 
b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-jdbc-e2e/connector-jdbc-e2e-part-4/src/test/java/org/apache/seatunnel/connectors/seatunnel/jdbc/JdbcMySqlCreateTableIT.java
index 30c6783897..bf3c9f654f 100644
--- 
a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-jdbc-e2e/connector-jdbc-e2e-part-4/src/test/java/org/apache/seatunnel/connectors/seatunnel/jdbc/JdbcMySqlCreateTableIT.java
+++ 
b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-jdbc-e2e/connector-jdbc-e2e-part-4/src/test/java/org/apache/seatunnel/connectors/seatunnel/jdbc/JdbcMySqlCreateTableIT.java
@@ -356,8 +356,9 @@ public class JdbcMySqlCreateTableIT extends TestSuiteBase 
implements TestResourc
     }
 
     private boolean checkMysql(String sql) {
-        try (Connection connection = getJdbcMySqlConnection()) {
-            ResultSet resultSet = 
connection.createStatement().executeQuery(sql);
+        try (Connection connection = getJdbcMySqlConnection();
+                Statement statement = connection.createStatement();
+                ResultSet resultSet = statement.executeQuery(sql)) {
             boolean tableExists = false;
             if (resultSet.next()) {
                 tableExists = resultSet.getBoolean(1);
@@ -369,8 +370,9 @@ public class JdbcMySqlCreateTableIT extends TestSuiteBase 
implements TestResourc
     }
 
     private boolean checkPG(String sql) {
-        try (Connection connection = getJdbcPgConnection()) {
-            ResultSet resultSet = 
connection.createStatement().executeQuery(sql);
+        try (Connection connection = getJdbcPgConnection();
+                Statement statement = connection.createStatement();
+                ResultSet resultSet = statement.executeQuery(sql)) {
             boolean tableExists = false;
             if (resultSet.next()) {
                 tableExists = resultSet.getBoolean(1);
@@ -382,8 +384,9 @@ public class JdbcMySqlCreateTableIT extends TestSuiteBase 
implements TestResourc
     }
 
     private boolean checkSqlServer(String sql) {
-        try (Connection connection = getJdbcSqlServerConnection()) {
-            ResultSet resultSet = 
connection.createStatement().executeQuery(sql);
+        try (Connection connection = getJdbcSqlServerConnection();
+                Statement statement = connection.createStatement();
+                ResultSet resultSet = statement.executeQuery(sql)) {
             boolean tableExists = false;
             if (resultSet.next()) {
                 tableExists = resultSet.getInt(1) == 1;
diff --git 
a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-jdbc-e2e/connector-jdbc-e2e-part-4/src/test/java/org/apache/seatunnel/connectors/seatunnel/jdbc/JdbcSqlServerCreateTableIT.java
 
b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-jdbc-e2e/connector-jdbc-e2e-part-4/src/test/java/org/apache/seatunnel/connectors/seatunnel/jdbc/JdbcSqlServerCreateTableIT.java
index ae2e625b15..9c8639160f 100644
--- 
a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-jdbc-e2e/connector-jdbc-e2e-part-4/src/test/java/org/apache/seatunnel/connectors/seatunnel/jdbc/JdbcSqlServerCreateTableIT.java
+++ 
b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-jdbc-e2e/connector-jdbc-e2e-part-4/src/test/java/org/apache/seatunnel/connectors/seatunnel/jdbc/JdbcSqlServerCreateTableIT.java
@@ -362,8 +362,9 @@ public class JdbcSqlServerCreateTableIT extends 
TestSuiteBase implements TestRes
     }
 
     private boolean checkMysql(String sql) {
-        try (Connection connection = getJdbcMySqlConnection()) {
-            ResultSet resultSet = 
connection.createStatement().executeQuery(sql);
+        try (Connection connection = getJdbcMySqlConnection();
+                Statement statement = connection.createStatement();
+                ResultSet resultSet = statement.executeQuery(sql)) {
             boolean tableExists = false;
             if (resultSet.next()) {
                 tableExists = resultSet.getBoolean(1);
@@ -375,8 +376,9 @@ public class JdbcSqlServerCreateTableIT extends 
TestSuiteBase implements TestRes
     }
 
     private boolean checkPG(String sql) {
-        try (Connection connection = getJdbcPgConnection()) {
-            ResultSet resultSet = 
connection.createStatement().executeQuery(sql);
+        try (Connection connection = getJdbcPgConnection();
+                Statement statement = connection.createStatement();
+                ResultSet resultSet = statement.executeQuery(sql)) {
             boolean tableExists = false;
             if (resultSet.next()) {
                 tableExists = resultSet.getBoolean(1);
@@ -388,8 +390,9 @@ public class JdbcSqlServerCreateTableIT extends 
TestSuiteBase implements TestRes
     }
 
     private boolean checkSqlServer(String sql) {
-        try (Connection connection = getJdbcSqlServerConnection()) {
-            ResultSet resultSet = 
connection.createStatement().executeQuery(sql);
+        try (Connection connection = getJdbcSqlServerConnection();
+                Statement statement = connection.createStatement();
+                ResultSet resultSet = statement.executeQuery(sql)) {
             boolean tableExists = false;
             if (resultSet.next()) {
                 tableExists = resultSet.getInt(1) == 1;
diff --git 
a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-jdbc-e2e/connector-jdbc-e2e-part-5/src/test/java/org/apache/seatunnel/connectors/seatunnel/jdbc/JdbcDorisIT.java
 
b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-jdbc-e2e/connector-jdbc-e2e-part-5/src/test/java/org/apache/seatunnel/connectors/seatunnel/jdbc/JdbcDorisIT.java
index 24bd606775..c4477ae9be 100644
--- 
a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-jdbc-e2e/connector-jdbc-e2e-part-5/src/test/java/org/apache/seatunnel/connectors/seatunnel/jdbc/JdbcDorisIT.java
+++ 
b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-jdbc-e2e/connector-jdbc-e2e-part-5/src/test/java/org/apache/seatunnel/connectors/seatunnel/jdbc/JdbcDorisIT.java
@@ -340,9 +340,9 @@ public class JdbcDorisIT extends TestSuiteBase implements 
TestResource {
     }
 
     private void assertHasData(String table) {
-        try (Statement statement = jdbcConnection.createStatement()) {
-            String sql = String.format("select * from %s.%s limit 1", 
DATABASE, table);
-            ResultSet source = statement.executeQuery(sql);
+        String sql = String.format("select * from %s.%s limit 1", DATABASE, 
table);
+        try (Statement statement = jdbcConnection.createStatement();
+                ResultSet source = statement.executeQuery(sql); ) {
             Assertions.assertTrue(source.next());
         } catch (Exception e) {
             throw new RuntimeException("Test doris server image error", e);
diff --git 
a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-jdbc-e2e/connector-jdbc-e2e-part-5/src/test/java/org/apache/seatunnel/connectors/seatunnel/jdbc/JdbcDorisdbIT.java
 
b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-jdbc-e2e/connector-jdbc-e2e-part-5/src/test/java/org/apache/seatunnel/connectors/seatunnel/jdbc/JdbcDorisdbIT.java
index 4c6aaa245c..ef8bc78821 100644
--- 
a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-jdbc-e2e/connector-jdbc-e2e-part-5/src/test/java/org/apache/seatunnel/connectors/seatunnel/jdbc/JdbcDorisdbIT.java
+++ 
b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-jdbc-e2e/connector-jdbc-e2e-part-5/src/test/java/org/apache/seatunnel/connectors/seatunnel/jdbc/JdbcDorisdbIT.java
@@ -332,9 +332,9 @@ public class JdbcDorisdbIT extends TestSuiteBase implements 
TestResource {
     }
 
     private void assertHasData(String table) {
-        try (Statement statement = jdbcConnection.createStatement()) {
-            String sql = String.format("select * from %s.%s limit 1", 
DATABASE, table);
-            ResultSet source = statement.executeQuery(sql);
+        String sql = String.format("select * from %s.%s limit 1", DATABASE, 
table);
+        try (Statement statement = jdbcConnection.createStatement();
+                ResultSet source = statement.executeQuery(sql)) {
             Assertions.assertTrue(source.next());
         } catch (Exception e) {
             throw new RuntimeException("Test doris server image error", e);
diff --git 
a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-jdbc-e2e/connector-jdbc-e2e-part-7/src/test/java/org/apache/seatunnel/connectors/seatunnel/jdbc/JdbcIrisIT.java
 
b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-jdbc-e2e/connector-jdbc-e2e-part-7/src/test/java/org/apache/seatunnel/connectors/seatunnel/jdbc/JdbcIrisIT.java
index b99c823de8..2a48e16fa5 100644
--- 
a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-jdbc-e2e/connector-jdbc-e2e-part-7/src/test/java/org/apache/seatunnel/connectors/seatunnel/jdbc/JdbcIrisIT.java
+++ 
b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-jdbc-e2e/connector-jdbc-e2e-part-7/src/test/java/org/apache/seatunnel/connectors/seatunnel/jdbc/JdbcIrisIT.java
@@ -326,10 +326,10 @@ public class JdbcIrisIT extends AbstractJdbcIT {
     public void testUpsert(TestContainer container) throws IOException, 
InterruptedException {
         Container.ExecResult execResult = 
container.executeJob("/jdbc_iris_upsert.conf");
         Assertions.assertEquals(0, execResult.getExitCode(), 
execResult.getStderr());
-        try (Statement statement = connection.createStatement()) {
-            ResultSet sink =
-                    statement.executeQuery(
-                            "SELECT * FROM test.e2e_upsert_table_sink ORDER BY 
pk_id");
+        try (Statement statement = connection.createStatement();
+                ResultSet sink =
+                        statement.executeQuery(
+                                "SELECT * FROM test.e2e_upsert_table_sink 
ORDER BY pk_id")) {
             String[] fieldNames = new String[] {"pk_id", "name", "score"};
             Object[] sinkResult = toArrayResult(sink, fieldNames);
             Assertions.assertEquals(2, sinkResult.length);
diff --git 
a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-kafka-e2e/src/test/java/org/apache/seatunnel/e2e/connector/kafka/KafkaFormatIT.java
 
b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-kafka-e2e/src/test/java/org/apache/seatunnel/e2e/connector/kafka/KafkaFormatIT.java
index ec7b0173ff..bbd92de12e 100644
--- 
a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-kafka-e2e/src/test/java/org/apache/seatunnel/e2e/connector/kafka/KafkaFormatIT.java
+++ 
b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-kafka-e2e/src/test/java/org/apache/seatunnel/e2e/connector/kafka/KafkaFormatIT.java
@@ -996,11 +996,10 @@ public class KafkaFormatIT extends TestSuiteBase 
implements TestResource {
                         POSTGRESQL_CONTAINER.getJdbcUrl(),
                         POSTGRESQL_CONTAINER.getUsername(),
                         POSTGRESQL_CONTAINER.getPassword())) {
-            try (Statement statement = connection.createStatement()) {
+            try (Statement statement = connection.createStatement();
+                    ResultSet resultSet =
+                            statement.executeQuery("select * from " + 
tableName + " order by id")) {
                 PostgresJdbcRowConverter postgresJdbcRowConverter = new 
PostgresJdbcRowConverter();
-                ResultSet resultSet =
-                        statement.executeQuery("select * from " + tableName + 
" order by id");
-
                 while (resultSet.next()) {
                     SeaTunnelRow row =
                             postgresJdbcRowConverter.toInternal(
diff --git 
a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-pulsar-e2e/src/test/java/org/apache/seatunnel/e2e/connector/pulsar/CanalToPulsarIT.java
 
b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-pulsar-e2e/src/test/java/org/apache/seatunnel/e2e/connector/pulsar/CanalToPulsarIT.java
index f273296ec8..d9a5774d88 100644
--- 
a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-pulsar-e2e/src/test/java/org/apache/seatunnel/e2e/connector/pulsar/CanalToPulsarIT.java
+++ 
b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-pulsar-e2e/src/test/java/org/apache/seatunnel/e2e/connector/pulsar/CanalToPulsarIT.java
@@ -309,8 +309,9 @@ public class CanalToPulsarIT extends TestSuiteBase 
implements TestResource {
                         POSTGRESQL_CONTAINER.getJdbcUrl(),
                         POSTGRESQL_CONTAINER.getUsername(),
                         POSTGRESQL_CONTAINER.getPassword())) {
-            try (Statement statement = connection.createStatement()) {
-                ResultSet resultSet = statement.executeQuery("SELECT * FROM 
sink ORDER BY id");
+            try (Statement statement = connection.createStatement();
+                    ResultSet resultSet =
+                            statement.executeQuery("SELECT * FROM sink ORDER 
BY id"); ) {
                 while (resultSet.next()) {
                     List<Object> row =
                             Arrays.asList(
diff --git 
a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-starrocks-e2e/src/test/java/org/apache/seatunnel/e2e/connector/starrocks/StarRocksCDCSinkIT.java
 
b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-starrocks-e2e/src/test/java/org/apache/seatunnel/e2e/connector/starrocks/StarRocksCDCSinkIT.java
index 1a16662f99..9b83254cc6 100644
--- 
a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-starrocks-e2e/src/test/java/org/apache/seatunnel/e2e/connector/starrocks/StarRocksCDCSinkIT.java
+++ 
b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-starrocks-e2e/src/test/java/org/apache/seatunnel/e2e/connector/starrocks/StarRocksCDCSinkIT.java
@@ -144,8 +144,8 @@ public class StarRocksCDCSinkIT extends TestSuiteBase 
implements TestResource {
 
         String sinkSql = String.format("select * from %s.%s", DATABASE, 
SINK_TABLE);
         Set<List<Object>> actual = new HashSet<>();
-        try (Statement sinkStatement = jdbcConnection.createStatement()) {
-            ResultSet sinkResultSet = sinkStatement.executeQuery(sinkSql);
+        try (Statement sinkStatement = jdbcConnection.createStatement();
+                ResultSet sinkResultSet = sinkStatement.executeQuery(sinkSql); 
) {
             while (sinkResultSet.next()) {
                 List<Object> row =
                         Arrays.asList(
diff --git 
a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-starrocks-e2e/src/test/java/org/apache/seatunnel/e2e/connector/starrocks/StarRocksIT.java
 
b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-starrocks-e2e/src/test/java/org/apache/seatunnel/e2e/connector/starrocks/StarRocksIT.java
index a536cf0231..6f9e41ba02 100644
--- 
a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-starrocks-e2e/src/test/java/org/apache/seatunnel/e2e/connector/starrocks/StarRocksIT.java
+++ 
b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-starrocks-e2e/src/test/java/org/apache/seatunnel/e2e/connector/starrocks/StarRocksIT.java
@@ -334,9 +334,9 @@ public class StarRocksIT extends TestSuiteBase implements 
TestResource {
     }
 
     private void assertHasData(String table) {
-        try (Statement statement = jdbcConnection.createStatement()) {
-            String sql = String.format("select * from %s.%s limit 1", 
DATABASE, table);
-            ResultSet source = statement.executeQuery(sql);
+        String sql = String.format("select * from %s.%s limit 1", DATABASE, 
table);
+        try (Statement statement = jdbcConnection.createStatement();
+                ResultSet source = statement.executeQuery(sql)) {
             Assertions.assertTrue(source.next());
         } catch (Exception e) {
             throw new RuntimeException("test starrocks server image error", e);
diff --git 
a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-tdengine-e2e/src/test/java/org/apache/seatunnel/e2e/connector/tdengine/TDengineIT.java
 
b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-tdengine-e2e/src/test/java/org/apache/seatunnel/e2e/connector/tdengine/TDengineIT.java
index 10724ac2c6..a96d34bd0c 100644
--- 
a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-tdengine-e2e/src/test/java/org/apache/seatunnel/e2e/connector/tdengine/TDengineIT.java
+++ 
b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-tdengine-e2e/src/test/java/org/apache/seatunnel/e2e/connector/tdengine/TDengineIT.java
@@ -136,8 +136,8 @@ public class TDengineIT extends TestSuiteBase implements 
TestResource {
     @SneakyThrows
     private long readSinkDataset() {
         long rowCount;
-        try (Statement stmt = connection2.createStatement()) {
-            ResultSet resultSet = stmt.executeQuery("select count(1) from 
power2.meters2;");
+        try (Statement stmt = connection2.createStatement();
+                ResultSet resultSet = stmt.executeQuery("select count(1) from 
power2.meters2;"); ) {
             resultSet.next();
             rowCount = resultSet.getLong(1);
         }

Reply via email to