This is an automated email from the ASF dual-hosted git repository. lpinter pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/hive.git
The following commit(s) were added to refs/heads/master by this push: new 01035cd HIVE-23107: Remove MIN_HISTORY_LEVEL table (Laszlo Pinter, reviewed by Peter Vary) 01035cd is described below commit 01035cd6097bd0df1deee35e5d375a80fc6e4dc7 Author: Laszlo Pinter <lpin...@cloudera.com> AuthorDate: Wed Apr 15 15:01:21 2020 +0200 HIVE-23107: Remove MIN_HISTORY_LEVEL table (Laszlo Pinter, reviewed by Peter Vary) --- .../hadoop/hive/ql/txn/compactor/Cleaner.java | 2 +- .../apache/hadoop/hive/ql/TestTxnCommands2.java | 12 -- .../hadoop/hive/metastore/txn/CompactionInfo.java | 1 + .../hive/metastore/txn/CompactionTxnHandler.java | 132 ++++++++++----------- .../hadoop/hive/metastore/txn/TxnDbUtil.java | 3 +- .../hadoop/hive/metastore/txn/TxnHandler.java | 53 +-------- .../apache/hadoop/hive/metastore/txn/TxnStore.java | 18 ++- .../src/main/sql/derby/hive-schema-4.0.0.derby.sql | 11 +- .../sql/derby/upgrade-3.2.0-to-4.0.0.derby.sql | 4 + .../src/main/sql/mssql/hive-schema-4.0.0.mssql.sql | 12 +- .../sql/mssql/upgrade-3.2.0-to-4.0.0.mssql.sql | 4 + .../src/main/sql/mysql/hive-schema-4.0.0.mysql.sql | 11 +- .../sql/mysql/upgrade-3.2.0-to-4.0.0.mysql.sql | 4 + .../main/sql/oracle/hive-schema-4.0.0.oracle.sql | 11 +- .../sql/oracle/upgrade-3.2.0-to-4.0.0.oracle.sql | 4 + .../sql/postgres/hive-schema-4.0.0.postgres.sql | 11 +- .../postgres/upgrade-3.2.0-to-4.0.0.postgres.sql | 4 + 17 files changed, 108 insertions(+), 189 deletions(-) diff --git a/ql/src/java/org/apache/hadoop/hive/ql/txn/compactor/Cleaner.java b/ql/src/java/org/apache/hadoop/hive/ql/txn/compactor/Cleaner.java index 54b616e..5fa3d9a 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/txn/compactor/Cleaner.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/txn/compactor/Cleaner.java @@ -92,7 +92,7 @@ public class Cleaner extends MetaStoreCompactorThread { try { handle = txnHandler.getMutexAPI().acquireLock(TxnStore.MUTEX_KEY.Cleaner.name()); startedAt = System.currentTimeMillis(); - long minOpenTxnId = txnHandler.findMinOpenTxnId(); + long minOpenTxnId = txnHandler.findMinOpenTxnIdForCleaner(); for(CompactionInfo compactionInfo : txnHandler.findReadyToClean()) { clean(compactionInfo, minOpenTxnId); } diff --git a/ql/src/test/org/apache/hadoop/hive/ql/TestTxnCommands2.java b/ql/src/test/org/apache/hadoop/hive/ql/TestTxnCommands2.java index f3834cc..2c13e8d 100644 --- a/ql/src/test/org/apache/hadoop/hive/ql/TestTxnCommands2.java +++ b/ql/src/test/org/apache/hadoop/hive/ql/TestTxnCommands2.java @@ -2045,13 +2045,10 @@ public class TestTxnCommands2 { // All inserts are committed and hence would expect in TXN_TO_WRITE_ID, 3 entries for acidTbl // and 2 entries for acidTblPart as each insert would have allocated a writeid. - // Also MIN_HISTORY_LEVEL won't have any entries as no reference for open txns. String acidTblWhereClause = " where t2w_database = " + quoteString("default") + " and t2w_table = " + quoteString(Table.ACIDTBL.name().toLowerCase()); String acidTblPartWhereClause = " where t2w_database = " + quoteString("default") + " and t2w_table = " + quoteString(Table.ACIDTBLPART.name().toLowerCase()); - Assert.assertEquals(TxnDbUtil.queryToString(hiveConf, "select * from MIN_HISTORY_LEVEL"), - 0, TxnDbUtil.countQueryAgent(hiveConf, "select count(*) from MIN_HISTORY_LEVEL")); Assert.assertEquals(TxnDbUtil.queryToString(hiveConf, "select * from TXN_TO_WRITE_ID" + acidTblWhereClause), 3, TxnDbUtil.countQueryAgent(hiveConf, "select count(*) from TXN_TO_WRITE_ID" + acidTblWhereClause)); Assert.assertEquals(TxnDbUtil.queryToString(hiveConf, "select * from TXN_TO_WRITE_ID" + acidTblPartWhereClause), @@ -2091,13 +2088,10 @@ public class TestTxnCommands2 { // We would expect 4 entries in TXN_TO_WRITE_ID as each insert would have allocated a writeid // including aborted one. - // Also MIN_HISTORY_LEVEL will have 1 entry for the open txn. Assert.assertEquals(TxnDbUtil.queryToString(hiveConf, "select * from TXN_TO_WRITE_ID" + acidTblWhereClause), 3, TxnDbUtil.countQueryAgent(hiveConf, "select count(*) from TXN_TO_WRITE_ID" + acidTblWhereClause)); Assert.assertEquals(TxnDbUtil.queryToString(hiveConf, "select * from TXN_TO_WRITE_ID" + acidTblPartWhereClause), 1, TxnDbUtil.countQueryAgent(hiveConf, "select count(*) from TXN_TO_WRITE_ID" + acidTblPartWhereClause)); - Assert.assertEquals(TxnDbUtil.queryToString(hiveConf, "select * from MIN_HISTORY_LEVEL"), - 1, TxnDbUtil.countQueryAgent(hiveConf, "select count(*) from MIN_HISTORY_LEVEL")); // The entry relevant to aborted txns shouldn't be removed from TXN_TO_WRITE_ID as // aborted txn would be removed from TXNS only after the compaction. Also, committed txn > open txn is retained. @@ -2112,7 +2106,6 @@ public class TestTxnCommands2 { // The cleaner will removed aborted txns data/metadata but cannot remove aborted txn2 from TXN_TO_WRITE_ID // as there is a open txn < aborted txn2. The aborted txn1 < open txn and will be removed. // Also, committed txn > open txn is retained. - // MIN_HISTORY_LEVEL will have 1 entry for the open txn. txnHandler.compact(new CompactionRequest("default", Table.ACIDTBL.name().toLowerCase(), CompactionType.MAJOR)); runWorker(hiveConf); runCleaner(hiveConf); @@ -2120,18 +2113,13 @@ public class TestTxnCommands2 { txnHandler.cleanTxnToWriteIdTable(); Assert.assertEquals(TxnDbUtil.queryToString(hiveConf, "select * from TXN_TO_WRITE_ID"), 2, TxnDbUtil.countQueryAgent(hiveConf, "select count(*) from TXN_TO_WRITE_ID")); - Assert.assertEquals(TxnDbUtil.queryToString(hiveConf, "select * from MIN_HISTORY_LEVEL"), - 1, TxnDbUtil.countQueryAgent(hiveConf, "select count(*) from MIN_HISTORY_LEVEL")); // Commit the open txn, which lets the cleanup on TXN_TO_WRITE_ID. - // Now all txns are removed from MIN_HISTORY_LEVEL. So, all entries from TXN_TO_WRITE_ID would be cleaned. txnMgr.commitTxn(); txnHandler.cleanTxnToWriteIdTable(); Assert.assertEquals(TxnDbUtil.queryToString(hiveConf, "select * from TXN_TO_WRITE_ID"), 0, TxnDbUtil.countQueryAgent(hiveConf, "select count(*) from TXN_TO_WRITE_ID")); - Assert.assertEquals(TxnDbUtil.queryToString(hiveConf, "select * from MIN_HISTORY_LEVEL"), - 0, TxnDbUtil.countQueryAgent(hiveConf, "select count(*) from MIN_HISTORY_LEVEL")); } private void verifyDirAndResult(int expectedDeltas) throws Exception { diff --git a/standalone-metastore/metastore-server/src/main/java/org/apache/hadoop/hive/metastore/txn/CompactionInfo.java b/standalone-metastore/metastore-server/src/main/java/org/apache/hadoop/hive/metastore/txn/CompactionInfo.java index bf91ae7..70d63ab 100644 --- a/standalone-metastore/metastore-server/src/main/java/org/apache/hadoop/hive/metastore/txn/CompactionInfo.java +++ b/standalone-metastore/metastore-server/src/main/java/org/apache/hadoop/hive/metastore/txn/CompactionInfo.java @@ -225,6 +225,7 @@ public class CompactionInfo implements Comparable<CompactionInfo> { cr.setWorkerId(ci.workerId); cr.setHighestWriteId(ci.highestWriteId); cr.setErrorMessage(ci.errorMessage); + return cr; } diff --git a/standalone-metastore/metastore-server/src/main/java/org/apache/hadoop/hive/metastore/txn/CompactionTxnHandler.java b/standalone-metastore/metastore-server/src/main/java/org/apache/hadoop/hive/metastore/txn/CompactionTxnHandler.java index 19a95b6..2344c2d 100644 --- a/standalone-metastore/metastore-server/src/main/java/org/apache/hadoop/hive/metastore/txn/CompactionTxnHandler.java +++ b/standalone-metastore/metastore-server/src/main/java/org/apache/hadoop/hive/metastore/txn/CompactionTxnHandler.java @@ -221,8 +221,10 @@ class CompactionTxnHandler extends TxnHandler { try { dbConn = getDbConn(Connection.TRANSACTION_READ_COMMITTED); stmt = dbConn.createStatement(); - String s = "UPDATE \"COMPACTION_QUEUE\" SET \"CQ_STATE\" = '" + READY_FOR_CLEANING + "', " + - "\"CQ_WORKER_ID\" = NULL WHERE \"CQ_ID\" = " + info.id; + String s = "UPDATE \"COMPACTION_QUEUE\" SET \"CQ_STATE\" = '" + READY_FOR_CLEANING + "', " + + "\"CQ_WORKER_ID\" = NULL, \"CQ_NEXT_TXN_ID\" = " + + "(SELECT \"NTXN_NEXT\" FROM \"NEXT_TXN_ID\")" + + " WHERE \"CQ_ID\" = " + info.id; LOG.debug("Going to execute update <" + s + ">"); int updCnt = stmt.executeUpdate(s); if (updCnt != 1) { @@ -302,57 +304,6 @@ class CompactionTxnHandler extends TxnHandler { return findReadyToClean(); } } - @Override - public long findMinOpenTxnId() throws MetaException { - Connection dbConn = null; - Statement stmt = null; - ResultSet rs = null; - try { - try { - dbConn = getDbConn(Connection.TRANSACTION_READ_COMMITTED); - stmt = dbConn.createStatement(); - return findMinOpenTxnGLB(stmt); - } catch (SQLException e) { - LOG.error("Unable to findMinOpenTxnId() due to:" + e.getMessage()); - rollbackDBConn(dbConn); - checkRetryable(dbConn, e, "findMinOpenTxnId"); - throw new MetaException("Unable to execute findMinOpenTxnId() " + - StringUtils.stringifyException(e)); - } finally { - close(rs, stmt, dbConn); - } - } catch (RetryException e) { - return findMinOpenTxnId(); - } - } - - /** - * See doc at {@link TxnStore#findMinOpenTxnId()} - * Note that {@link #openTxns(OpenTxnRequest)} makes update of NEXT_TXN and MIN_HISTORY_LEVEL - * a single atomic operation (and no one else should update these tables except the cleaner - * which deletes rows from MIN_HISTORY_LEVEL which can only allow minOpenTxn to move higher) - */ - private long findMinOpenTxnGLB(Statement stmt) throws MetaException, SQLException { - String s = "SELECT \"NTXN_NEXT\" FROM \"NEXT_TXN_ID\""; - LOG.debug("Going to execute query <" + s + ">"); - ResultSet rs = stmt.executeQuery(s); - if (!rs.next()) { - throw new MetaException("Transaction tables not properly " + - "initialized, no record found in next_txn_id"); - } - long hwm = rs.getLong(1); - s = "SELECT MIN(\"MHL_MIN_OPEN_TXNID\") FROM \"MIN_HISTORY_LEVEL\""; - LOG.debug("Going to execute query <" + s + ">"); - rs = stmt.executeQuery(s); - rs.next(); - long minOpenTxnId = rs.getLong(1); - if(rs.wasNull()) { - return hwm; - } - //since generating new txnid uses select for update on single row in NEXT_TXN_ID - assert hwm >= minOpenTxnId : "(hwm, minOpenTxnId)=(" + hwm + "," + minOpenTxnId + ")"; - return minOpenTxnId; - } /** * This will remove an entry from the queue after @@ -523,7 +474,7 @@ class CompactionTxnHandler extends TxnHandler { } /** * Clean up entries from TXN_TO_WRITE_ID table less than min_uncommited_txnid as found by - * min(NEXT_TXN_ID.ntxn_next, min(MIN_HISTORY_LEVEL.mhl_min_open_txnid), min(Aborted TXNS.txn_id)). + * min(NEXT_TXN_ID.ntxn_next, min(WRITE_SET.WS_COMMIT_ID), min(Aborted TXNS.txn_id)). */ @Override @RetrySemantics.SafeToRetry @@ -542,25 +493,27 @@ class CompactionTxnHandler extends TxnHandler { // First need to find the min_uncommitted_txnid which is currently seen by any open transactions. // If there are no txns which are currently open or aborted in the system, then current value of // NEXT_TXN_ID.ntxn_next could be min_uncommitted_txnid. - long minUncommittedTxnId = findMinOpenTxnGLB(stmt); - - // If there are aborted txns, then the minimum aborted txnid could be the min_uncommitted_txnid - // if lesser than both NEXT_TXN_ID.ntxn_next and min(MIN_HISTORY_LEVEL .mhl_min_open_txnid). - String s = "SELECT MIN(\"TXN_ID\") FROM \"TXNS\" WHERE \"TXN_STATE\" = " + quoteChar(TXN_ABORTED); + String s = "SELECT MIN(\"RES\".\"ID\") AS \"ID\" FROM (" + + "SELECT MIN(\"NTXN_NEXT\") AS \"ID\" FROM \"NEXT_TXN_ID\" " + + "UNION " + + "SELECT MIN(\"WS_COMMIT_ID\") AS \"ID\" FROM \"WRITE_SET\" " + + "UNION " + + "SELECT MIN(\"TXN_ID\") AS \"ID\" FROM \"TXNS\" WHERE \"TXN_STATE\" = " + quoteChar(TXN_ABORTED) + + " OR \"TXN_STATE\" = " + quoteChar(TXN_OPEN) + + ") \"RES\""; LOG.debug("Going to execute query <" + s + ">"); rs = stmt.executeQuery(s); - if (rs.next()) { - long minAbortedTxnId = rs.getLong(1); - if (minAbortedTxnId > 0) { - minUncommittedTxnId = Math.min(minAbortedTxnId, minUncommittedTxnId); - } + if (!rs.next()) { + throw new MetaException("Transaction tables not properly initialized, no record found in NEXT_TXN_ID"); } + long minUncommitedTxnid = rs.getLong(1); + // As all txns below min_uncommitted_txnid are either committed or empty_aborted, we are allowed // to cleanup the entries less than min_uncommitted_txnid from the TXN_TO_WRITE_ID table. - s = "DELETE FROM \"TXN_TO_WRITE_ID\" WHERE \"T2W_TXNID\" < " + minUncommittedTxnId; + s = "DELETE FROM \"TXN_TO_WRITE_ID\" WHERE \"T2W_TXNID\" < " + minUncommitedTxnid; LOG.debug("Going to execute delete <" + s + ">"); int rc = stmt.executeUpdate(s); - LOG.info("Removed " + rc + " rows from TXN_TO_WRITE_ID with Txn Low-Water-Mark: " + minUncommittedTxnId); + LOG.info("Removed " + rc + " rows from TXN_TO_WRITE_ID with Txn Low-Water-Mark: " + minUncommitedTxnid); LOG.debug("Going to commit"); dbConn.commit(); @@ -1168,6 +1121,53 @@ class CompactionTxnHandler extends TxnHandler { setHadoopJobId(hadoopJobId, id); } } + + @Override + @RetrySemantics.Idempotent + public long findMinOpenTxnIdForCleaner() throws MetaException{ + Connection dbConn = null; + Statement stmt = null; + ResultSet rs = null; + try { + try { + dbConn = getDbConn(Connection.TRANSACTION_READ_COMMITTED); + stmt = dbConn.createStatement(); + String query = "SELECT COUNT(\"TXN_ID\") FROM \"TXNS\" WHERE \"TXN_STATE\" = " + quoteChar(TXN_OPEN); + LOG.debug("Going to execute query <" + query + ">"); + rs = stmt.executeQuery(query); + if (!rs.next()) { + throw new MetaException("Transaction tables not properly initialized."); + } + long numOpenTxns = rs.getLong(1); + if (numOpenTxns > 0) { + query = "SELECT MIN(\"RES\".\"ID\") FROM (" + + "SELECT MIN(\"TXN_ID\") AS \"ID\" FROM \"TXNS\" WHERE \"TXN_STATE\" = " + quoteChar(TXN_OPEN) + + " UNION " + + "SELECT MAX(\"CQ_NEXT_TXN_ID\") AS \"ID\" FROM \"COMPACTION_QUEUE\" WHERE \"CQ_STATE\" = " + + quoteChar(READY_FOR_CLEANING) + + ") \"RES\""; + } else { + query = "SELECT \"NTXN_NEXT\" FROM \"NEXT_TXN_ID\""; + } + LOG.debug("Going to execute query <" + query + ">"); + rs = stmt.executeQuery(query); + if (!rs.next()) { + throw new MetaException("Transaction tables not properly initialized, no record found in NEXT_TXN_ID"); + } + return rs.getLong(1); + } catch (SQLException e) { + LOG.error("Unable to getMinOpenTxnIdForCleaner", e); + rollbackDBConn(dbConn); + checkRetryable(dbConn, e, "getMinOpenTxnForCleaner"); + throw new MetaException("Unable to execute getMinOpenTxnIfForCleaner() " + + StringUtils.stringifyException(e)); + } finally { + close(rs, stmt, dbConn); + } + } catch (RetryException e) { + return findMinOpenTxnIdForCleaner(); + } + } } diff --git a/standalone-metastore/metastore-server/src/main/java/org/apache/hadoop/hive/metastore/txn/TxnDbUtil.java b/standalone-metastore/metastore-server/src/main/java/org/apache/hadoop/hive/metastore/txn/TxnDbUtil.java index 620c77e..a66e169 100644 --- a/standalone-metastore/metastore-server/src/main/java/org/apache/hadoop/hive/metastore/txn/TxnDbUtil.java +++ b/standalone-metastore/metastore-server/src/main/java/org/apache/hadoop/hive/metastore/txn/TxnDbUtil.java @@ -173,7 +173,8 @@ public final class TxnDbUtil { " CQ_HIGHEST_WRITE_ID bigint," + " CQ_META_INFO varchar(2048) for bit data," + " CQ_HADOOP_JOB_ID varchar(32)," + - " CQ_ERROR_MESSAGE clob)"); + " CQ_ERROR_MESSAGE clob," + + " CQ_NEXT_TXN_ID bigint)"); stmt.execute("CREATE TABLE NEXT_COMPACTION_QUEUE_ID (NCQ_NEXT bigint NOT NULL)"); stmt.execute("INSERT INTO NEXT_COMPACTION_QUEUE_ID VALUES(1)"); diff --git a/standalone-metastore/metastore-server/src/main/java/org/apache/hadoop/hive/metastore/txn/TxnHandler.java b/standalone-metastore/metastore-server/src/main/java/org/apache/hadoop/hive/metastore/txn/TxnHandler.java index 7d0db0c..be4d63c 100644 --- a/standalone-metastore/metastore-server/src/main/java/org/apache/hadoop/hive/metastore/txn/TxnHandler.java +++ b/standalone-metastore/metastore-server/src/main/java/org/apache/hadoop/hive/metastore/txn/TxnHandler.java @@ -645,48 +645,6 @@ abstract class TxnHandler implements TxnStore, TxnStore.MutexAPI { pst.execute(); } - // Need to register minimum open txnid for current transactions into MIN_HISTORY table. - // For a single txn we can do it in a single insert. With multiple txns calculating the - // minOpenTxnId for every insert is not cost effective, so caching the value - if (txnIds.size() == 1) { - s = "INSERT INTO \"MIN_HISTORY_LEVEL\" (\"MHL_TXNID\",\"MHL_MIN_OPEN_TXNID\") " + - "SELECT ?, MIN(\"TXN_ID\") FROM \"TXNS\" WHERE \"TXN_STATE\" = " + quoteChar(TXN_OPEN); - LOG.debug("Going to execute query <" + s + ">"); - try (PreparedStatement pstmt = dbConn.prepareStatement(s)) { - pstmt.setLong(1, txnIds.get(0)); - pstmt.execute(); - } - LOG.info("Added entries to MIN_HISTORY_LEVEL with a single query for current txn: " + txnIds); - } else { - s = "SELECT MIN(\"TXN_ID\") FROM \"TXNS\" WHERE \"TXN_STATE\" = " + quoteChar(TXN_OPEN); - LOG.debug("Going to execute query <" + s + ">"); - long minOpenTxnId = -1L; - try(ResultSet minOpenTxnIdRs = stmt.executeQuery(s)) { - if (!minOpenTxnIdRs.next()) { - throw new IllegalStateException("Scalar query returned no rows?!?!!"); - } - // TXNS table should have at least one entry because we just inserted the newly opened txns. - // So, min(txn_id) would be a non-zero txnid. - minOpenTxnId = minOpenTxnIdRs.getLong(1); - } - - assert (minOpenTxnId > 0); - rows.clear(); - for (long txnId = first; txnId < first + numTxns; txnId++) { - rows.add(txnId + ", " + minOpenTxnId); - } - - // Insert transaction entries into MIN_HISTORY_LEVEL. - List<String> inserts = sqlGenerator.createInsertValuesStmt( - "\"MIN_HISTORY_LEVEL\" (\"MHL_TXNID\", \"MHL_MIN_OPEN_TXNID\")", rows); - for (String insert : inserts) { - LOG.debug("Going to execute insert <" + insert + ">"); - stmt.execute(insert); - } - LOG.info("Added entries to MIN_HISTORY_LEVEL for current txns: (" + txnIds - + ") with min_open_txn: " + minOpenTxnId); - } - if (rqst.isSetReplPolicy()) { List<String> rowsRepl = new ArrayList<>(); for (PreparedStatement pst : insertPreparedStmts) { @@ -1330,7 +1288,6 @@ abstract class TxnHandler implements TxnStore, TxnStore.MutexAPI { deleteReplTxnMapEntry(dbConn, sourceTxnId, rqst.getReplPolicy()); } cleanUpTxnRelatedMetadata(txnid, stmt); - // update the key/value associated with the transaction if it has been // set if (rqst.isSetKeyValue()) { @@ -1391,10 +1348,8 @@ abstract class TxnHandler implements TxnStore, TxnStore.MutexAPI { "DELETE FROM \"TXN_COMPONENTS\" WHERE \"TC_TXNID\" = " + txnid, "DELETE FROM \"HIVE_LOCKS\" WHERE \"HL_TXNID\" = " + txnid, "DELETE FROM \"TXNS\" WHERE \"TXN_ID\" = " + txnid, - "DELETE FROM \"MIN_HISTORY_LEVEL\" WHERE \"MHL_TXNID\" = " + txnid, "DELETE FROM \"MATERIALIZATION_REBUILD_LOCKS\" WHERE \"MRL_TXN_ID\" = " + txnid); executeQueriesInBatch(stmt, queries, conf); - LOG.info("Removed committed transaction: (" + txnid + ") from MIN_HISTORY_LEVEL"); } /** @@ -4280,20 +4235,14 @@ abstract class TxnHandler implements TxnStore, TxnStore.MutexAPI { TxnUtils.buildQueryWithINClause(conf, queries, prefix, suffix, txnids, "\"TXN_ID\"", true, false); int numUpdateQueries = queries.size(); - // add delete min history queries to query list - prefix.setLength(0); - suffix.setLength(0); - prefix.append("DELETE FROM \"MIN_HISTORY_LEVEL\" WHERE "); - TxnUtils.buildQueryWithINClause(conf, queries, prefix, suffix, txnids, "\"MHL_TXNID\"", false, false); - // add delete hive locks queries to query list prefix.setLength(0); + suffix.setLength(0); prefix.append("DELETE FROM \"HIVE_LOCKS\" WHERE "); TxnUtils.buildQueryWithINClause(conf, queries, prefix, suffix, txnids, "\"HL_TXNID\"", false, false); // execute all queries in the list in one batch List<Integer> affectedRowsByQuery = executeQueriesInBatch(stmt, queries, conf); - LOG.info("Removed aborted transactions: (" + txnids + ") from MIN_HISTORY_LEVEL"); return getUpdateCount(numUpdateQueries, affectedRowsByQuery); } finally { closeStmt(stmt); diff --git a/standalone-metastore/metastore-server/src/main/java/org/apache/hadoop/hive/metastore/txn/TxnStore.java b/standalone-metastore/metastore-server/src/main/java/org/apache/hadoop/hive/metastore/txn/TxnStore.java index 41d2e79..87130a5 100644 --- a/standalone-metastore/metastore-server/src/main/java/org/apache/hadoop/hive/metastore/txn/TxnStore.java +++ b/standalone-metastore/metastore-server/src/main/java/org/apache/hadoop/hive/metastore/txn/TxnStore.java @@ -360,16 +360,6 @@ public interface TxnStore extends Configurable { List<CompactionInfo> findReadyToClean() throws MetaException; /** - * Returns the smallest txnid that could be seen in open state across all active transactions in - * the system or {@code NEXT_TXN_ID.NTXN_NEXT} if there are no active transactions, i.e. the - * smallest txnid that can be seen as unresolved in the whole system. Even if a transaction - * is opened concurrently with this call it cannot have an id less than what this method returns. - * @return transaction ID - */ - @RetrySemantics.ReadOnly - long findMinOpenTxnId() throws MetaException; - - /** * This will remove an entry from the queue after * it has been compacted. * @@ -517,4 +507,12 @@ public interface TxnStore extends Configurable { */ @RetrySemantics.Idempotent void addWriteNotificationLog(AcidWriteEvent acidWriteEvent) throws MetaException; + + /** + * Return the currently seen minimum open transaction ID. + * @return minimum transaction ID + * @throws MetaException + */ + @RetrySemantics.Idempotent + long findMinOpenTxnIdForCleaner() throws MetaException; } diff --git a/standalone-metastore/metastore-server/src/main/sql/derby/hive-schema-4.0.0.derby.sql b/standalone-metastore/metastore-server/src/main/sql/derby/hive-schema-4.0.0.derby.sql index 05adbe9..1ace9d3 100644 --- a/standalone-metastore/metastore-server/src/main/sql/derby/hive-schema-4.0.0.derby.sql +++ b/standalone-metastore/metastore-server/src/main/sql/derby/hive-schema-4.0.0.derby.sql @@ -603,7 +603,8 @@ CREATE TABLE COMPACTION_QUEUE ( CQ_HIGHEST_WRITE_ID bigint, CQ_META_INFO varchar(2048) for bit data, CQ_HADOOP_JOB_ID varchar(32), - CQ_ERROR_MESSAGE clob + CQ_ERROR_MESSAGE clob, + CQ_NEXT_TXN_ID bigint ); CREATE TABLE NEXT_COMPACTION_QUEUE_ID ( @@ -667,14 +668,6 @@ CREATE TABLE NEXT_WRITE_ID ( CREATE UNIQUE INDEX NEXT_WRITE_ID_IDX ON NEXT_WRITE_ID (NWI_DATABASE, NWI_TABLE); -CREATE TABLE MIN_HISTORY_LEVEL ( - MHL_TXNID bigint NOT NULL, - MHL_MIN_OPEN_TXNID bigint NOT NULL, - PRIMARY KEY(MHL_TXNID) -); - -CREATE INDEX MIN_HISTORY_LEVEL_IDX ON MIN_HISTORY_LEVEL (MHL_MIN_OPEN_TXNID); - CREATE TABLE MATERIALIZATION_REBUILD_LOCKS ( MRL_TXN_ID BIGINT NOT NULL, MRL_DB_NAME VARCHAR(128) NOT NULL, diff --git a/standalone-metastore/metastore-server/src/main/sql/derby/upgrade-3.2.0-to-4.0.0.derby.sql b/standalone-metastore/metastore-server/src/main/sql/derby/upgrade-3.2.0-to-4.0.0.derby.sql index 35a2e64..8a3cd56 100644 --- a/standalone-metastore/metastore-server/src/main/sql/derby/upgrade-3.2.0-to-4.0.0.derby.sql +++ b/standalone-metastore/metastore-server/src/main/sql/derby/upgrade-3.2.0-to-4.0.0.derby.sql @@ -64,5 +64,9 @@ ALTER TABLE "SCHEDULED_QUERIES" ADD "ACTIVE_EXECUTION_ID" bigint; -- HIVE-22995 ALTER TABLE "APP"."DBS" ADD COLUMN "DB_MANAGED_LOCATION_URI" VARCHAR(4000); +-- HIVE-23107 +ALTER TABLE COMPACTION_QUEUE ADD CQ_NEXT_TXN_ID bigint; +DROP TABLE MIN_HISTORY_LEVEL; + -- This needs to be the last thing done. Insert any changes above this line. UPDATE "APP".VERSION SET SCHEMA_VERSION='4.0.0', VERSION_COMMENT='Hive release version 4.0.0' where VER_ID=1; diff --git a/standalone-metastore/metastore-server/src/main/sql/mssql/hive-schema-4.0.0.mssql.sql b/standalone-metastore/metastore-server/src/main/sql/mssql/hive-schema-4.0.0.mssql.sql index f3c74bf..2e01777 100644 --- a/standalone-metastore/metastore-server/src/main/sql/mssql/hive-schema-4.0.0.mssql.sql +++ b/standalone-metastore/metastore-server/src/main/sql/mssql/hive-schema-4.0.0.mssql.sql @@ -1016,6 +1016,7 @@ CREATE TABLE COMPACTION_QUEUE( CQ_META_INFO varbinary(2048) NULL, CQ_HADOOP_JOB_ID nvarchar(128) NULL, CQ_ERROR_MESSAGE varchar(max) NULL, + CQ_NEXT_TXN_ID bigint NOT NULL, PRIMARY KEY CLUSTERED ( CQ_ID ASC @@ -1200,17 +1201,6 @@ CREATE TABLE NEXT_WRITE_ID ( CREATE UNIQUE INDEX NEXT_WRITE_ID_IDX ON NEXT_WRITE_ID (NWI_DATABASE, NWI_TABLE); -CREATE TABLE MIN_HISTORY_LEVEL ( - MHL_TXNID bigint NOT NULL, - MHL_MIN_OPEN_TXNID bigint NOT NULL, -PRIMARY KEY CLUSTERED -( - MHL_TXNID ASC -) -); - -CREATE INDEX MIN_HISTORY_LEVEL_IDX ON MIN_HISTORY_LEVEL (MHL_MIN_OPEN_TXNID); - CREATE TABLE MATERIALIZATION_REBUILD_LOCKS ( MRL_TXN_ID bigint NOT NULL, MRL_DB_NAME nvarchar(128) NOT NULL, diff --git a/standalone-metastore/metastore-server/src/main/sql/mssql/upgrade-3.2.0-to-4.0.0.mssql.sql b/standalone-metastore/metastore-server/src/main/sql/mssql/upgrade-3.2.0-to-4.0.0.mssql.sql index 228bb7c..9f39515 100644 --- a/standalone-metastore/metastore-server/src/main/sql/mssql/upgrade-3.2.0-to-4.0.0.mssql.sql +++ b/standalone-metastore/metastore-server/src/main/sql/mssql/upgrade-3.2.0-to-4.0.0.mssql.sql @@ -67,6 +67,10 @@ INSERT INTO NOTIFICATION_SEQUENCE (NNI_ID, NEXT_EVENT_ID) SELECT 1,1 WHERE NOT E -- HIVE-22995 ALTER TABLE DBS ADD DB_MANAGED_LOCATION_URI nvarchar(4000); +-- HIVE-23107 +ALTER TABLE COMPACTION_QUEUE bigint CQ_NEXT_TXN_ID NOT NULL; +DROP TABLE MIN_HISTORY_LEVEL; + -- These lines need to be last. Insert any changes above. UPDATE VERSION SET SCHEMA_VERSION='4.0.0', VERSION_COMMENT='Hive release version 4.0.0' where VER_ID=1; SELECT 'Finished upgrading MetaStore schema from 3.2.0 to 4.0.0' AS MESSAGE; diff --git a/standalone-metastore/metastore-server/src/main/sql/mysql/hive-schema-4.0.0.mysql.sql b/standalone-metastore/metastore-server/src/main/sql/mysql/hive-schema-4.0.0.mysql.sql index 626d888..0512a45 100644 --- a/standalone-metastore/metastore-server/src/main/sql/mysql/hive-schema-4.0.0.mysql.sql +++ b/standalone-metastore/metastore-server/src/main/sql/mysql/hive-schema-4.0.0.mysql.sql @@ -1072,7 +1072,8 @@ CREATE TABLE COMPACTION_QUEUE ( CQ_HIGHEST_WRITE_ID bigint, CQ_META_INFO varbinary(2048), CQ_HADOOP_JOB_ID varchar(32), - CQ_ERROR_MESSAGE mediumtext + CQ_ERROR_MESSAGE mediumtext, + CQ_NEXT_TXN_ID bigint ) ENGINE=InnoDB DEFAULT CHARSET=latin1; CREATE TABLE COMPLETED_COMPACTIONS ( @@ -1134,14 +1135,6 @@ CREATE TABLE NEXT_WRITE_ID ( CREATE UNIQUE INDEX NEXT_WRITE_ID_IDX ON NEXT_WRITE_ID (NWI_DATABASE, NWI_TABLE); -CREATE TABLE MIN_HISTORY_LEVEL ( - MHL_TXNID bigint NOT NULL, - MHL_MIN_OPEN_TXNID bigint NOT NULL, - PRIMARY KEY(MHL_TXNID) -) ENGINE=InnoDB DEFAULT CHARSET=latin1; - -CREATE INDEX MIN_HISTORY_LEVEL_IDX ON MIN_HISTORY_LEVEL (MHL_MIN_OPEN_TXNID); - CREATE TABLE MATERIALIZATION_REBUILD_LOCKS ( MRL_TXN_ID bigint NOT NULL, MRL_DB_NAME VARCHAR(128) NOT NULL, diff --git a/standalone-metastore/metastore-server/src/main/sql/mysql/upgrade-3.2.0-to-4.0.0.mysql.sql b/standalone-metastore/metastore-server/src/main/sql/mysql/upgrade-3.2.0-to-4.0.0.mysql.sql index 35da7b5..4b82e36 100644 --- a/standalone-metastore/metastore-server/src/main/sql/mysql/upgrade-3.2.0-to-4.0.0.mysql.sql +++ b/standalone-metastore/metastore-server/src/main/sql/mysql/upgrade-3.2.0-to-4.0.0.mysql.sql @@ -68,6 +68,10 @@ ALTER TABLE SCHEDULED_QUERIES ADD COLUMN ACTIVE_EXECUTION_ID INTEGER ; -- HIVE-22995 ALTER TABLE DBS ADD COLUMN DB_MANAGED_LOCATION_URI VARCHAR(4000) CHARACTER SET latin1 COLLATE latin1_bin; +-- HIVE-23107 +ALTER TABLE COMPACTION_QUEUE ADD CQ_NEXT_TXN_ID bigint; +DROP TABLE MIN_HISTORY_LEVEL; + -- These lines need to be last. Insert any changes above. UPDATE VERSION SET SCHEMA_VERSION='4.0.0', VERSION_COMMENT='Hive release version 4.0.0' where VER_ID=1; SELECT 'Finished upgrading MetaStore schema from 3.2.0 to 4.0.0' AS MESSAGE; diff --git a/standalone-metastore/metastore-server/src/main/sql/oracle/hive-schema-4.0.0.oracle.sql b/standalone-metastore/metastore-server/src/main/sql/oracle/hive-schema-4.0.0.oracle.sql index a25f4e4..db398e5 100644 --- a/standalone-metastore/metastore-server/src/main/sql/oracle/hive-schema-4.0.0.oracle.sql +++ b/standalone-metastore/metastore-server/src/main/sql/oracle/hive-schema-4.0.0.oracle.sql @@ -1053,7 +1053,8 @@ CREATE TABLE COMPACTION_QUEUE ( CQ_HIGHEST_WRITE_ID NUMBER(19), CQ_META_INFO BLOB, CQ_HADOOP_JOB_ID varchar2(32), - CQ_ERROR_MESSAGE CLOB + CQ_ERROR_MESSAGE CLOB, + CQ_NEXT_TXN_ID NUMBER(19) ) ROWDEPENDENCIES; CREATE TABLE NEXT_COMPACTION_QUEUE_ID ( @@ -1115,14 +1116,6 @@ CREATE TABLE NEXT_WRITE_ID ( CREATE UNIQUE INDEX NEXT_WRITE_ID_IDX ON NEXT_WRITE_ID (NWI_DATABASE, NWI_TABLE); -CREATE TABLE MIN_HISTORY_LEVEL ( - MHL_TXNID NUMBER(19) NOT NULL, - MHL_MIN_OPEN_TXNID NUMBER(19) NOT NULL, - PRIMARY KEY(MHL_TXNID) -); - -CREATE INDEX MIN_HISTORY_LEVEL_IDX ON MIN_HISTORY_LEVEL (MHL_MIN_OPEN_TXNID); - CREATE TABLE MATERIALIZATION_REBUILD_LOCKS ( MRL_TXN_ID NUMBER NOT NULL, MRL_DB_NAME VARCHAR(128) NOT NULL, diff --git a/standalone-metastore/metastore-server/src/main/sql/oracle/upgrade-3.2.0-to-4.0.0.oracle.sql b/standalone-metastore/metastore-server/src/main/sql/oracle/upgrade-3.2.0-to-4.0.0.oracle.sql index d462b4a..1be83fc 100644 --- a/standalone-metastore/metastore-server/src/main/sql/oracle/upgrade-3.2.0-to-4.0.0.oracle.sql +++ b/standalone-metastore/metastore-server/src/main/sql/oracle/upgrade-3.2.0-to-4.0.0.oracle.sql @@ -68,6 +68,10 @@ ALTER TABLE SCHEDULED_QUERIES ADD ACTIVE_EXECUTION_ID number(19); -- HIVE-22995 ALTER TABLE DBS ADD DB_MANAGED_LOCATION_URI VARCHAR2(4000) NULL; +-- HIVE-23107 +ALTER TABLE COMPACTION_QUEUE ADD CQ_NEXT_TXN_ID NUMBER(19); +DROP TABLE MIN_HISTORY_LEVEL; + -- These lines need to be last. Insert any changes above. UPDATE VERSION SET SCHEMA_VERSION='4.0.0', VERSION_COMMENT='Hive release version 4.0.0' where VER_ID=1; SELECT 'Finished upgrading MetaStore schema from 3.2.0 to 4.0.0' AS Status from dual; diff --git a/standalone-metastore/metastore-server/src/main/sql/postgres/hive-schema-4.0.0.postgres.sql b/standalone-metastore/metastore-server/src/main/sql/postgres/hive-schema-4.0.0.postgres.sql index 2066340..e6e3016 100644 --- a/standalone-metastore/metastore-server/src/main/sql/postgres/hive-schema-4.0.0.postgres.sql +++ b/standalone-metastore/metastore-server/src/main/sql/postgres/hive-schema-4.0.0.postgres.sql @@ -1739,7 +1739,8 @@ CREATE TABLE "COMPACTION_QUEUE" ( "CQ_HIGHEST_WRITE_ID" bigint, "CQ_META_INFO" bytea, "CQ_HADOOP_JOB_ID" varchar(32), - "CQ_ERROR_MESSAGE" text + "CQ_ERROR_MESSAGE" text, + "CQ_NEXT_TXN_ID" bigint ); CREATE TABLE "NEXT_COMPACTION_QUEUE_ID" ( @@ -1801,14 +1802,6 @@ CREATE TABLE "NEXT_WRITE_ID" ( CREATE UNIQUE INDEX "NEXT_WRITE_ID_IDX" ON "NEXT_WRITE_ID" ("NWI_DATABASE", "NWI_TABLE"); -CREATE TABLE "MIN_HISTORY_LEVEL" ( - "MHL_TXNID" bigint NOT NULL, - "MHL_MIN_OPEN_TXNID" bigint NOT NULL, - PRIMARY KEY("MHL_TXNID") -); - -CREATE INDEX "MIN_HISTORY_LEVEL_IDX" ON "MIN_HISTORY_LEVEL" ("MHL_MIN_OPEN_TXNID"); - CREATE TABLE "MATERIALIZATION_REBUILD_LOCKS" ( "MRL_TXN_ID" bigint NOT NULL, "MRL_DB_NAME" varchar(128) NOT NULL, diff --git a/standalone-metastore/metastore-server/src/main/sql/postgres/upgrade-3.2.0-to-4.0.0.postgres.sql b/standalone-metastore/metastore-server/src/main/sql/postgres/upgrade-3.2.0-to-4.0.0.postgres.sql index a50a071..b90cecb 100644 --- a/standalone-metastore/metastore-server/src/main/sql/postgres/upgrade-3.2.0-to-4.0.0.postgres.sql +++ b/standalone-metastore/metastore-server/src/main/sql/postgres/upgrade-3.2.0-to-4.0.0.postgres.sql @@ -199,6 +199,10 @@ ALTER TABLE "SCHEDULED_QUERIES" ADD "ACTIVE_EXECUTION_ID" bigint; -- HIVE-22995 ALTER TABLE "DBS" ADD "DB_MANAGED_LOCATION_URI" character varying(4000); +-- HIVE-23107 +ALTER TABLE "COMPACTION_QUEUE" ADD "CQ_NEXT_TXN_ID" bigint; +DROP TABLE "MIN_HISTORY_LEVEL"; + -- These lines need to be last. Insert any changes above. UPDATE "VERSION" SET "SCHEMA_VERSION"='4.0.0', "VERSION_COMMENT"='Hive release version 4.0.0' where "VER_ID"=1; SELECT 'Finished upgrading MetaStore schema from 3.2.0 to 4.0.0';