This is an automated email from the ASF dual-hosted git repository. dkuzmenko pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/hive.git
The following commit(s) were added to refs/heads/master by this push: new 4b01a607091 HIVE-27958: Refactor DirectSqlUpdatePart class (Wechar Yu, reviewed by Attila Turoczy, Denys Kuzmenko) 4b01a607091 is described below commit 4b01a607091581ac9bdb372f8b47c1efca4d4bb4 Author: Wechar Yu <yuwq1...@gmail.com> AuthorDate: Tue Feb 6 17:15:18 2024 +0800 HIVE-27958: Refactor DirectSqlUpdatePart class (Wechar Yu, reviewed by Attila Turoczy, Denys Kuzmenko) Closes #5003 --- .../hadoop/hive/metastore/DatabaseProduct.java | 23 +++ .../hadoop/hive/metastore/DirectSqlUpdatePart.java | 192 +++++++-------------- .../hive/metastore/txn/retry/SqlRetryHandler.java | 27 +-- 3 files changed, 87 insertions(+), 155 deletions(-) diff --git a/standalone-metastore/metastore-server/src/main/java/org/apache/hadoop/hive/metastore/DatabaseProduct.java b/standalone-metastore/metastore-server/src/main/java/org/apache/hadoop/hive/metastore/DatabaseProduct.java index 642057bd69a..b2b20503d24 100644 --- a/standalone-metastore/metastore-server/src/main/java/org/apache/hadoop/hive/metastore/DatabaseProduct.java +++ b/standalone-metastore/metastore-server/src/main/java/org/apache/hadoop/hive/metastore/DatabaseProduct.java @@ -27,6 +27,7 @@ import java.util.EnumMap; import java.util.HashMap; import java.util.List; import java.util.Map; +import java.util.concurrent.locks.ReentrantLock; import java.util.stream.Stream; import org.apache.commons.lang3.exception.ExceptionUtils; @@ -57,6 +58,11 @@ public class DatabaseProduct implements Configurable { DeadlineException.class }; + /** + * Derby specific concurrency control + */ + private static final ReentrantLock derbyLock = new ReentrantLock(true); + public enum DbType {DERBY, MYSQL, POSTGRES, ORACLE, SQLSERVER, CUSTOM, UNDEFINED}; public DbType dbType; @@ -776,4 +782,21 @@ public class DatabaseProduct implements Configurable { public void setConf(Configuration c) { myConf = c; } + + /** + * lockInternal() and {@link #unlockInternal()} are used to serialize those operations that require + * Select ... For Update to sequence operations properly. In practice that means when running + * with Derby database. See more notes at class level. + */ + public void lockInternal() { + if (isDERBY()) { + derbyLock.lock(); + } + } + + public void unlockInternal() { + if (isDERBY()) { + derbyLock.unlock(); + } + } } diff --git a/standalone-metastore/metastore-server/src/main/java/org/apache/hadoop/hive/metastore/DirectSqlUpdatePart.java b/standalone-metastore/metastore-server/src/main/java/org/apache/hadoop/hive/metastore/DirectSqlUpdatePart.java index 67c293ee64f..441ce26ac6d 100644 --- a/standalone-metastore/metastore-server/src/main/java/org/apache/hadoop/hive/metastore/DirectSqlUpdatePart.java +++ b/standalone-metastore/metastore-server/src/main/java/org/apache/hadoop/hive/metastore/DirectSqlUpdatePart.java @@ -67,7 +67,6 @@ import java.util.Map; import java.util.Objects; import java.util.Optional; import java.util.Set; -import java.util.concurrent.locks.ReentrantLock; import java.util.stream.Collectors; import static org.apache.hadoop.hive.common.StatsSetupConst.COLUMN_STATS_ACCURATE; @@ -92,8 +91,6 @@ class DirectSqlUpdatePart { private final int maxBatchSize; private final SQLGenerator sqlGenerator; - private static final ReentrantLock derbyLock = new ReentrantLock(true); - public DirectSqlUpdatePart(PersistenceManager pm, Configuration conf, DatabaseProduct dbType, int batchSize) { this.pm = pm; @@ -103,23 +100,6 @@ class DirectSqlUpdatePart { sqlGenerator = new SQLGenerator(dbType, conf); } - /** - * {@link #lockInternal()} and {@link #unlockInternal()} are used to serialize those operations that require - * Select ... For Update to sequence operations properly. In practice that means when running - * with Derby database. See more notes at class level. - */ - private void lockInternal() { - if(dbType.isDERBY()) { - derbyLock.lock(); - } - } - - private void unlockInternal() { - if(dbType.isDERBY()) { - derbyLock.unlock(); - } - } - void rollbackDBConn(Connection dbConn) { try { if (dbConn != null && !dbConn.isClosed()) dbConn.rollback(); @@ -138,43 +118,16 @@ class DirectSqlUpdatePart { } } - void closeStmt(Statement stmt) { - try { - if (stmt != null && !stmt.isClosed()) stmt.close(); - } catch (SQLException e) { - LOG.warn("Failed to close statement ", e); - } - } - - void close(ResultSet rs) { - try { - if (rs != null && !rs.isClosed()) { - rs.close(); - } - } - catch(SQLException ex) { - LOG.warn("Failed to close statement ", ex); - } - } - static String quoteString(String input) { return "'" + input + "'"; } - void close(ResultSet rs, Statement stmt, JDOConnection dbConn) { - close(rs); - closeStmt(stmt); - closeDbConn(dbConn); - } - private void populateInsertUpdateMap(Map<PartitionInfo, ColumnStatistics> statsPartInfoMap, Map<PartColNameInfo, MPartitionColumnStatistics> updateMap, Map<PartColNameInfo, MPartitionColumnStatistics>insertMap, Connection dbConn, Table tbl) throws SQLException, MetaException, NoSuchObjectException { StringBuilder prefix = new StringBuilder(); StringBuilder suffix = new StringBuilder(); - Statement statement = null; - ResultSet rs = null; List<String> queries = new ArrayList<>(); Set<PartColNameInfo> selectedParts = new HashSet<>(); @@ -186,16 +139,14 @@ class DirectSqlUpdatePart { TxnUtils.buildQueryWithINClause(conf, queries, prefix, suffix, partIdList, "\"PART_ID\"", true, false); - for (String query : queries) { - try { - statement = dbConn.createStatement(); - LOG.debug("Going to execute query " + query); - rs = statement.executeQuery(query); - while (rs.next()) { - selectedParts.add(new PartColNameInfo(rs.getLong(1), rs.getString(2), rs.getString(3))); + try (Statement statement = dbConn.createStatement()) { + for (String query : queries) { + LOG.debug("Execute query: " + query); + try (ResultSet rs = statement.executeQuery(query)) { + while (rs.next()) { + selectedParts.add(new PartColNameInfo(rs.getLong(1), rs.getString(2), rs.getString(3))); + } } - } finally { - close(rs, statement, null); } } @@ -246,13 +197,13 @@ class DirectSqlUpdatePart { partIds.add(partColNameInfo.partitionId); pst.addBatch(); if (partIds.size() == maxBatchSize) { - LOG.debug("Going to execute updates on part: {}", partIds); + LOG.debug("Execute updates on part: {}", partIds); verifyUpdates(pst.executeBatch(), partIds); partIds = new ArrayList<>(); } } if (!partIds.isEmpty()) { - LOG.debug("Going to execute updates on part: {}", partIds); + LOG.debug("Execute updates on part: {}", partIds); verifyUpdates(pst.executeBatch(), partIds); } } @@ -270,7 +221,6 @@ class DirectSqlUpdatePart { private void insertIntoPartColStatTable(Map<PartColNameInfo, MPartitionColumnStatistics> insertMap, long maxCsId, Connection dbConn) throws SQLException, MetaException, NoSuchObjectException { - PreparedStatement preparedStatement = null; int numRows = 0; String insert = "INSERT INTO \"PART_COL_STATS\" (\"CS_ID\", \"CAT_NAME\", \"DB_NAME\"," + "\"TABLE_NAME\", \"PARTITION_NAME\", \"COLUMN_NAME\", \"COLUMN_TYPE\", \"PART_ID\"," @@ -279,8 +229,7 @@ class DirectSqlUpdatePart { + " \"HISTOGRAM\", \"AVG_COL_LEN\", \"MAX_COL_LEN\", \"NUM_TRUES\", \"NUM_FALSES\", \"LAST_ANALYZED\", \"ENGINE\") values " + "(?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?)"; - try { - preparedStatement = dbConn.prepareStatement(insert); + try (PreparedStatement preparedStatement = dbConn.prepareStatement(insert)) { for (Map.Entry entry : insertMap.entrySet()) { PartColNameInfo partColNameInfo = (PartColNameInfo) entry.getKey(); Long partId = partColNameInfo.partitionId; @@ -323,8 +272,6 @@ class DirectSqlUpdatePart { if (numRows != 0) { preparedStatement.executeBatch(); } - } finally { - closeStmt(preparedStatement); } } @@ -332,8 +279,6 @@ class DirectSqlUpdatePart { List<String> queries = new ArrayList<>(); StringBuilder prefix = new StringBuilder(); StringBuilder suffix = new StringBuilder(); - Statement statement = null; - ResultSet rs = null; prefix.append("select \"PART_ID\", \"PARAM_VALUE\" " + " from \"PARTITION_PARAMS\" where " @@ -343,18 +288,17 @@ class DirectSqlUpdatePart { partIdList, "\"PART_ID\"", true, false); Map<Long, String> partIdToParaMap = new HashMap<>(); - for (String query : queries) { - try { - statement = dbConn.createStatement(); - LOG.debug("Going to execute query " + query); - rs = statement.executeQuery(query); - while (rs.next()) { - partIdToParaMap.put(rs.getLong(1), rs.getString(2)); + try (Statement statement = dbConn.createStatement()) { + for (String query : queries) { + LOG.debug("Execute query: " + query); + try (ResultSet rs = statement.executeQuery(query)) { + while (rs.next()) { + partIdToParaMap.put(rs.getLong(1), rs.getString(2)); + } } - } finally { - close(rs, statement, null); } } + return partIdToParaMap; } @@ -367,14 +311,10 @@ class DirectSqlUpdatePart { TxnUtils.buildQueryWithINClause(conf, queries, prefix, suffix, partIdList, "\"PART_ID\"", false, false); - Statement statement = null; - for (String query : queries) { - try { - statement = dbConn.createStatement(); - LOG.debug("Going to execute update " + query); + try (Statement statement = dbConn.createStatement()) { + for (String query : queries) { + LOG.debug("Execute update: " + query); statement.executeUpdate(query); - } finally { - closeStmt(statement); } } } @@ -503,8 +443,6 @@ class DirectSqlUpdatePart { List<String> queries = new ArrayList<>(); StringBuilder prefix = new StringBuilder(); StringBuilder suffix = new StringBuilder(); - Statement statement = null; - ResultSet rs = null; Map<PartitionInfo, ColumnStatistics> partitionInfoMap = new HashMap<>(); List<String> partKeys = partColStatsMap.keySet().stream().map( @@ -516,20 +454,18 @@ class DirectSqlUpdatePart { TxnUtils.buildQueryWithINClauseStrings(conf, queries, prefix, suffix, partKeys, "\"PART_NAME\"", true, false); - for (String query : queries) { - // Select for update makes sure that the partitions are not modified while the stats are getting updated. - query = sqlGenerator.addForUpdateClause(query); - try { - statement = dbConn.createStatement(); - LOG.debug("Going to execute query <" + query + ">"); - rs = statement.executeQuery(query); - while (rs.next()) { - PartitionInfo partitionInfo = new PartitionInfo(rs.getLong(1), - rs.getLong(2), rs.getString(3)); - partitionInfoMap.put(partitionInfo, partColStatsMap.get(rs.getString(3))); + try (Statement statement = dbConn.createStatement()) { + for (String query : queries) { + // Select for update makes sure that the partitions are not modified while the stats are getting updated. + query = sqlGenerator.addForUpdateClause(query); + LOG.debug("Execute query: " + query); + try (ResultSet rs = statement.executeQuery(query)) { + while (rs.next()) { + PartitionInfo partitionInfo = new PartitionInfo(rs.getLong(1), + rs.getLong(2), rs.getString(3)); + partitionInfoMap.put(partitionInfo, partColStatsMap.get(rs.getString(3))); + } } - } finally { - close(rs, statement, null); } } return partitionInfoMap; @@ -556,7 +492,7 @@ class DirectSqlUpdatePart { Connection dbConn = null; boolean committed = false; try { - lockInternal(); + dbType.lockInternal(); jdoConn = pm.getDataStoreConnection(); dbConn = (Connection) (jdoConn.getNativeConnection()); @@ -606,7 +542,7 @@ class DirectSqlUpdatePart { rollbackDBConn(dbConn); } closeDbConn(jdoConn); - unlockInternal(); + dbType.unlockInternal(); } } @@ -615,15 +551,13 @@ class DirectSqlUpdatePart { * @return The CD id before update. */ public long getNextCSIdForMPartitionColumnStatistics(long numStats) throws MetaException { - Statement statement = null; - ResultSet rs = null; long maxCsId = 0; boolean committed = false; Connection dbConn = null; JDOConnection jdoConn = null; try { - lockInternal(); + dbType.lockInternal(); jdoConn = pm.getDataStoreConnection(); dbConn = (Connection) (jdoConn.getNativeConnection()); @@ -637,43 +571,41 @@ class DirectSqlUpdatePart { String query = sqlGenerator.addForUpdateClause("SELECT \"NEXT_VAL\" FROM \"SEQUENCE_TABLE\" " + "WHERE \"SEQUENCE_NAME\"= " + quoteString("org.apache.hadoop.hive.metastore.model.MPartitionColumnStatistics")); - LOG.debug("Going to execute query " + query); - statement = dbConn.createStatement(); - rs = statement.executeQuery(query); - if (rs.next()) { - maxCsId = rs.getLong(1); - } else if (insertDone) { - throw new MetaException("Invalid state of SEQUENCE_TABLE for MPartitionColumnStatistics"); - } else { - insertDone = true; - closeStmt(statement); - statement = dbConn.createStatement(); - query = "INSERT INTO \"SEQUENCE_TABLE\" (\"SEQUENCE_NAME\", \"NEXT_VAL\") VALUES ( " - + quoteString("org.apache.hadoop.hive.metastore.model.MPartitionColumnStatistics") + "," + 1 - + ")"; - try { - statement.executeUpdate(query); - } catch (SQLException e) { - // If the record is already inserted by some other thread continue to select. - if (dbType.isDuplicateKeyError(e)) { - continue; + LOG.debug("Execute query: " + query); + try (Statement statement = dbConn.createStatement(); + ResultSet rs = statement.executeQuery(query)) { + if (rs.next()) { + maxCsId = rs.getLong(1); + } else if (insertDone) { + throw new MetaException("Invalid state of SEQUENCE_TABLE for MPartitionColumnStatistics"); + } else { + insertDone = true; + query = "INSERT INTO \"SEQUENCE_TABLE\" (\"SEQUENCE_NAME\", \"NEXT_VAL\") VALUES ( " + + quoteString("org.apache.hadoop.hive.metastore.model.MPartitionColumnStatistics") + "," + 1 + + ")"; + try { + statement.executeUpdate(query); + } catch (SQLException e) { + // If the record is already inserted by some other thread continue to select. + if (dbType.isDuplicateKeyError(e)) { + continue; + } + LOG.error("Unable to insert into SEQUENCE_TABLE for MPartitionColumnStatistics.", e); + throw e; } - LOG.error("Unable to insert into SEQUENCE_TABLE for MPartitionColumnStatistics.", e); - throw e; - } finally { - closeStmt(statement); } } } long nextMaxCsId = maxCsId + numStats + 1; - closeStmt(statement); - statement = dbConn.createStatement(); String query = "UPDATE \"SEQUENCE_TABLE\" SET \"NEXT_VAL\" = " + nextMaxCsId + " WHERE \"SEQUENCE_NAME\" = " + quoteString("org.apache.hadoop.hive.metastore.model.MPartitionColumnStatistics"); - statement.executeUpdate(query); + + try (Statement statement = dbConn.createStatement()) { + statement.executeUpdate(query); + } dbConn.commit(); committed = true; return maxCsId; @@ -685,8 +617,8 @@ class DirectSqlUpdatePart { if (!committed) { rollbackDBConn(dbConn); } - close(rs, statement, jdoConn); - unlockInternal(); + closeDbConn(jdoConn); + dbType.unlockInternal(); } } diff --git a/standalone-metastore/metastore-server/src/main/java/org/apache/hadoop/hive/metastore/txn/retry/SqlRetryHandler.java b/standalone-metastore/metastore-server/src/main/java/org/apache/hadoop/hive/metastore/txn/retry/SqlRetryHandler.java index 0b7127b6826..727ba0b1bf8 100644 --- a/standalone-metastore/metastore-server/src/main/java/org/apache/hadoop/hive/metastore/txn/retry/SqlRetryHandler.java +++ b/standalone-metastore/metastore-server/src/main/java/org/apache/hadoop/hive/metastore/txn/retry/SqlRetryHandler.java @@ -33,7 +33,6 @@ import org.springframework.jdbc.UncategorizedSQLException; import java.sql.SQLException; import java.util.Objects; import java.util.concurrent.TimeUnit; -import java.util.concurrent.locks.ReentrantLock; import java.util.regex.Pattern; /** @@ -48,11 +47,6 @@ public class SqlRetryHandler { private final StackThreadLocal<Object> threadLocal = new StackThreadLocal<>(); - /** - * Derby specific concurrency control - */ - private static final ReentrantLock derbyLock = new ReentrantLock(true); - private final DatabaseProduct databaseProduct; private final long deadlockRetryInterval; private final long retryInterval; @@ -122,14 +116,14 @@ public class SqlRetryHandler { try { if (properties.isLockInternally()) { - lockInternal(); + databaseProduct.lockInternal(); } threadLocal.set(new Object()); return executeWithRetryInternal(properties, function); } finally { threadLocal.unset(); if (properties.isLockInternally()) { - unlockInternal(); + databaseProduct.unlockInternal(); } } } @@ -269,21 +263,4 @@ public class SqlRetryHandler { } return false; } - - /** - * lockInternal() and {@link #unlockInternal()} are used to serialize those operations that require - * Select ... For Update to sequence operations properly. In practice that means when running - * with Derby database. See more notes at class level. - */ - private void lockInternal() { - if(databaseProduct.isDERBY()) { - derbyLock.lock(); - } - } - private void unlockInternal() { - if(databaseProduct.isDERBY()) { - derbyLock.unlock(); - } - } - }