deniskuzZ commented on code in PR #4384:
URL: https://github.com/apache/hive/pull/4384#discussion_r1308996345
##########
standalone-metastore/metastore-server/src/main/java/org/apache/hadoop/hive/metastore/txn/CompactionTxnHandler.java:
##########
@@ -678,316 +215,38 @@ private void setCleanerStart(Connection dbConn,
CompactionInfo info, Long timest
@RetrySemantics.CannotRetry
public void markCleaned(CompactionInfo info) throws MetaException {
LOG.debug("Running markCleaned with CompactionInfo: {}", info);
- try {
- Connection dbConn = null;
- PreparedStatement pStmt = null;
- ResultSet rs = null;
- try {
- dbConn = getDbConn(Connection.TRANSACTION_READ_COMMITTED,
connPoolCompaction);
- if (!info.isAbortedTxnCleanup()) {
- String s = "INSERT INTO \"COMPLETED_COMPACTIONS\"(\"CC_ID\",
\"CC_DATABASE\", "
- + "\"CC_TABLE\", \"CC_PARTITION\", \"CC_STATE\", \"CC_TYPE\",
\"CC_TBLPROPERTIES\", \"CC_WORKER_ID\", "
- + "\"CC_START\", \"CC_END\", \"CC_RUN_AS\",
\"CC_HIGHEST_WRITE_ID\", \"CC_META_INFO\", "
- + "\"CC_HADOOP_JOB_ID\", \"CC_ERROR_MESSAGE\",
\"CC_ENQUEUE_TIME\", "
- + "\"CC_WORKER_VERSION\", \"CC_INITIATOR_ID\",
\"CC_INITIATOR_VERSION\", "
- + "\"CC_NEXT_TXN_ID\", \"CC_TXN_ID\", \"CC_COMMIT_TIME\",
\"CC_POOL_NAME\", \"CC_NUMBER_OF_BUCKETS\","
- + "\"CC_ORDER_BY\") "
- + "SELECT \"CQ_ID\", \"CQ_DATABASE\", \"CQ_TABLE\",
\"CQ_PARTITION\", "
- + quoteChar(SUCCEEDED_STATE) + ", \"CQ_TYPE\",
\"CQ_TBLPROPERTIES\", \"CQ_WORKER_ID\", \"CQ_START\", "
- + getEpochFn(dbProduct) + ", \"CQ_RUN_AS\",
\"CQ_HIGHEST_WRITE_ID\", \"CQ_META_INFO\", "
- + "\"CQ_HADOOP_JOB_ID\", \"CQ_ERROR_MESSAGE\",
\"CQ_ENQUEUE_TIME\", "
- + "\"CQ_WORKER_VERSION\", \"CQ_INITIATOR_ID\",
\"CQ_INITIATOR_VERSION\", "
- + "\"CQ_NEXT_TXN_ID\", \"CQ_TXN_ID\", \"CQ_COMMIT_TIME\",
\"CQ_POOL_NAME\", \"CQ_NUMBER_OF_BUCKETS\", "
- + "\"CQ_ORDER_BY\" "
- + "FROM \"COMPACTION_QUEUE\" WHERE \"CQ_ID\" = ?";
- pStmt = dbConn.prepareStatement(s);
- pStmt.setLong(1, info.id);
- LOG.debug("Going to execute update <{}> for CQ_ID={}", s, info.id);
- pStmt.executeUpdate();
- }
-
- /* Remove compaction queue record corresponding to the compaction
which has been successful as well as
- * remove all abort retry associated metadata of table/partition in
the COMPACTION_QUEUE both when compaction
- * or abort cleanup is successful. We don't want a situation wherein
we have an abort retry entry for a table
- * but no corresponding entry in TXN_COMPONENTS table. Successful
compaction will delete
- * the retry metadata, so that abort cleanup is retried again (an
optimistic retry approach).
- */
- removeCompactionAndAbortRetryEntries(dbConn, info);
-
- if (!info.isAbortedTxnCleanup()) {
- // Remove entries from completed_txn_components as well, so we don't
start looking there
- // again but only up to the highest write ID include in this
compaction job.
- //highestWriteId will be NULL in upgrade scenarios
- String query =
- "DELETE FROM \"COMPLETED_TXN_COMPONENTS\" " +
- " WHERE \"CTC_DATABASE\" = ? AND \"CTC_TABLE\" = ?";
- if (info.partName != null) {
- query += " AND \"CTC_PARTITION\" = ?";
- }
- if (info.highestWriteId != 0) {
- query += " AND \"CTC_WRITEID\" <= ?";
- }
- pStmt = dbConn.prepareStatement(query);
- int paramCount = 1;
- pStmt.setString(paramCount++, info.dbname);
- pStmt.setString(paramCount++, info.tableName);
- if (info.partName != null) {
- pStmt.setString(paramCount++, info.partName);
- }
- if (info.highestWriteId != 0) {
- pStmt.setLong(paramCount, info.highestWriteId);
- }
- LOG.debug("Going to execute update <{}>", query);
- int updCount = pStmt.executeUpdate();
- if (updCount < 1) {
- LOG.warn("Expected to remove at least one row from
completed_txn_components when " +
- "marking compaction entry as clean!");
- }
- LOG.debug("Removed {} records from completed_txn_components",
updCount);
- }
-
- // Do cleanup of metadata in TXN_COMPONENTS table.
- removeTxnComponents(dbConn, info);
- LOG.debug("Going to commit");
- dbConn.commit();
- } catch (SQLException e) {
- LOG.error("Unable to delete from compaction queue " + e.getMessage());
- LOG.debug("Going to rollback");
- rollbackDBConn(dbConn);
- checkRetryable(e, "markCleaned(" + info + ")");
- throw new MetaException(DB_FAILED_TO_CONNECT + e.getMessage());
- } finally {
- close(rs, pStmt, dbConn);
- }
- } catch (RetryException e) {
- markCleaned(info);
- }
- }
-
- private void removeTxnComponents(Connection dbConn, CompactionInfo info)
throws MetaException, RetryException {
- PreparedStatement pStmt = null;
- ResultSet rs = null;
- try {
- /*
- * compaction may remove data from aborted txns above tc_writeid bit it
only guarantees to
- * remove it up to (inclusive) tc_writeid, so it's critical to not
remove metadata about
- * aborted TXN_COMPONENTS above tc_writeid (and consequently about
aborted txns).
- * See {@link ql.txn.compactor.Cleaner.removeFiles()}
- */
- String s = "DELETE FROM \"TXN_COMPONENTS\" WHERE \"TC_TXNID\" IN ( "
- + "SELECT \"TXN_ID\" FROM \"TXNS\" WHERE \"TXN_STATE\" = " +
TxnStatus.ABORTED + ") "
- + "AND \"TC_DATABASE\" = ? AND \"TC_TABLE\" = ? "
- + "AND \"TC_PARTITION\" " + TxnUtils.nvl(info.partName);
-
- List<String> queries = new ArrayList<>();
- Iterator<Long> writeIdsIter = null;
- List<Integer> counts = null;
-
- if (info.writeIds != null && !info.writeIds.isEmpty()) {
- StringBuilder prefix = new StringBuilder(s).append(" AND ");
- List<String> questions = Collections.nCopies(info.writeIds.size(),
"?");
-
- counts = TxnUtils.buildQueryWithINClauseStrings(conf, queries, prefix,
- new StringBuilder(), questions, "\"TC_WRITEID\"", false,
false);
- writeIdsIter = info.writeIds.iterator();
- } else if (!info.hasUncompactedAborts) {
- if (info.highestWriteId != 0) {
- s += " AND \"TC_WRITEID\" <= ?";
- }
- queries.add(s);
- }
-
- for (int i = 0; i < queries.size(); i++) {
- String query = queries.get(i);
- int writeIdCount = (counts != null) ? counts.get(i) : 0;
-
- LOG.debug("Going to execute update <{}>", query);
- pStmt = dbConn.prepareStatement(query);
- int paramCount = 1;
-
- pStmt.setString(paramCount++, info.dbname);
- pStmt.setString(paramCount++, info.tableName);
- if (info.partName != null) {
- pStmt.setString(paramCount++, info.partName);
- }
- if (info.highestWriteId != 0 && writeIdCount == 0) {
- pStmt.setLong(paramCount, info.highestWriteId);
- }
- for (int j = 0; j < writeIdCount; j++) {
- if (writeIdsIter.hasNext()) {
- pStmt.setLong(paramCount + j, writeIdsIter.next());
- }
- }
- int rc = pStmt.executeUpdate();
- LOG.debug("Removed {} records from txn_components", rc);
- }
- } catch (SQLException e) {
- LOG.error("Unable to delete from txn components due to {}",
e.getMessage());
- LOG.debug("Going to rollback");
- rollbackDBConn(dbConn);
- checkRetryable(e, "markCleanedForAborts(" + info + ")");
- throw new MetaException(DB_FAILED_TO_CONNECT + e.getMessage());
- } finally {
- close(rs);
- closeStmt(pStmt);
- }
+ new MarkCleanedFunction(info, dbProduct, conf).execute(jdbcTemplate);
}
-
+
/**
* Clean up entries from TXN_TO_WRITE_ID table less than
min_uncommited_txnid as found by
* min(max(TXNS.txn_id), min(WRITE_SET.WS_COMMIT_ID), min(Aborted
TXNS.txn_id)).
*/
@Override
@RetrySemantics.SafeToRetry
public void cleanTxnToWriteIdTable() throws MetaException {
- try {
- Connection dbConn = null;
- Statement stmt = null;
- ResultSet rs = null;
-
- try {
- long minTxnIdSeenOpen = findMinTxnIdSeenOpen();
-
- // We query for minimum values in all the queries and they can only
increase by any concurrent
- // operations. So, READ COMMITTED is sufficient.
- dbConn = getDbConn(Connection.TRANSACTION_READ_COMMITTED,
connPoolCompaction);
- stmt = dbConn.createStatement();
-
- // First need to find the min_uncommitted_txnid which is currently
seen by any open transactions.
- // If there are no txns which are currently open or aborted in the
system, then current value of
- // max(TXNS.txn_id) could be min_uncommitted_txnid.
- String s = "SELECT MIN(\"RES\".\"ID\") AS \"ID\" FROM (" +
- " SELECT MAX(\"TXN_ID\") + 1 AS \"ID\" FROM \"TXNS\"" +
- (useMinHistoryLevel ? "" :
- " UNION" +
- " SELECT MIN(\"WS_TXNID\") AS \"ID\" FROM \"WRITE_SET\"") +
- " UNION" +
- " SELECT MIN(\"TXN_ID\") AS \"ID\" FROM \"TXNS\" WHERE \"TXN_STATE\"
= " + TxnStatus.ABORTED +
- (useMinHistoryLevel ? "" :
- " OR \"TXN_STATE\" = " + TxnStatus.OPEN) +
- " ) \"RES\"";
+ long minTxnIdSeenOpen = findMinTxnIdSeenOpen();
- LOG.debug("Going to execute query <{}>", s);
- rs = stmt.executeQuery(s);
- if (!rs.next()) {
- throw new MetaException("Transaction tables not properly
initialized, no record found in TXNS");
- }
- long minUncommitedTxnid = Math.min(rs.getLong(1), minTxnIdSeenOpen);
-
- // As all txns below min_uncommitted_txnid are either committed or
empty_aborted, we are allowed
- // to cleanup the entries less than min_uncommitted_txnid from the
TXN_TO_WRITE_ID table.
- s = "DELETE FROM \"TXN_TO_WRITE_ID\" WHERE \"T2W_TXNID\" < " +
minUncommitedTxnid;
- LOG.debug("Going to execute delete <{}>", s);
- int rc = stmt.executeUpdate(s);
- LOG.info("Removed {} rows from TXN_TO_WRITE_ID with Txn
Low-Water-Mark: {}", rc, minUncommitedTxnid);
-
- LOG.debug("Going to commit");
- dbConn.commit();
- } catch (SQLException e) {
- LOG.error("Unable to delete from TXN_TO_WRITE_ID table " +
e.getMessage());
- LOG.debug("Going to rollback");
- rollbackDBConn(dbConn);
- checkRetryable(e, "cleanTxnToWriteIdTable");
- throw new MetaException(DB_FAILED_TO_CONNECT + e.getMessage());
- } finally {
- close(rs, stmt, dbConn);
- }
- } catch (RetryException e) {
- cleanTxnToWriteIdTable();
+ // First need to find the min_uncommitted_txnid which is currently seen by
any open transactions.
+ // If there are no txns which are currently open or aborted in the system,
then current value of
+ // max(TXNS.txn_id) could be min_uncommitted_txnid.
+ Long minTxnId = jdbcTemplate.execute(new
MinUncommittedTxnIdHandler(useMinHistoryLevel));
+ if (minTxnId == null) {
Review Comment:
why that is not part of MinUncommittedTxnIdHandler?
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]