deniskuzZ commented on code in PR #4384:
URL: https://github.com/apache/hive/pull/4384#discussion_r1293066381
##########
standalone-metastore/metastore-server/src/main/java/org/apache/hadoop/hive/metastore/txn/CompactionTxnHandler.java:
##########
@@ -1396,245 +388,59 @@ private void
insertAbortRetryRetentionTimeOnError(Connection dbConn, CompactionI
@Override
@RetrySemantics.SafeToRetry
public void purgeCompactionHistory() throws MetaException {
- Connection dbConn = null;
- Statement stmt = null;
- PreparedStatement pStmt = null;
- ResultSet rs = null;
- List<Long> deleteSet = new ArrayList<>();
- RetentionCounters rc = null;
- long timeoutThreshold = System.currentTimeMillis() -
- MetastoreConf.getTimeVar(conf,
ConfVars.COMPACTOR_HISTORY_RETENTION_TIMEOUT, TimeUnit.MILLISECONDS);
- int didNotInitiateRetention = MetastoreConf.getIntVar(conf,
ConfVars.COMPACTOR_HISTORY_RETENTION_DID_NOT_INITIATE);
- int failedRetention = getFailedCompactionRetention();
- int succeededRetention = MetastoreConf.getIntVar(conf,
ConfVars.COMPACTOR_HISTORY_RETENTION_SUCCEEDED);
- int refusedRetention = MetastoreConf.getIntVar(conf,
ConfVars.COMPACTOR_HISTORY_RETENTION_REFUSED);
- try {
- try {
- dbConn = getDbConn(Connection.TRANSACTION_READ_COMMITTED,
connPoolCompaction);
- stmt = dbConn.createStatement();
- /* cc_id is monotonically increasing so for any entity sorts in order
of compaction history,
- thus this query groups by entity and withing group sorts most recent
first */
- rs = stmt.executeQuery("SELECT \"CC_ID\", \"CC_DATABASE\",
\"CC_TABLE\", \"CC_PARTITION\", "
- + "\"CC_STATE\" , \"CC_START\", \"CC_TYPE\" "
- + "FROM \"COMPLETED_COMPACTIONS\" ORDER BY \"CC_DATABASE\",
\"CC_TABLE\", \"CC_PARTITION\"," +
- "\"CC_ID\" DESC");
- String lastCompactedEntity = null;
- /* In each group, walk from most recent and count occurrences of each
state type. Once you
- * have counted enough (for each state) to satisfy retention policy,
delete all other
- * instances of this status, plus timed-out entries (see this method's
JavaDoc).
- */
- while(rs.next()) {
- CompactionInfo ci = new CompactionInfo(
- rs.getLong(1), rs.getString(2), rs.getString(3),
- rs.getString(4), rs.getString(5).charAt(0));
- ci.start = rs.getLong(6);
- ci.type =
TxnUtils.dbCompactionType2ThriftType(rs.getString(7).charAt(0));
- if(!ci.getFullPartitionName().equals(lastCompactedEntity)) {
- lastCompactedEntity = ci.getFullPartitionName();
- rc = new RetentionCounters(didNotInitiateRetention,
failedRetention, succeededRetention, refusedRetention);
- }
- checkForDeletion(deleteSet, ci, rc, timeoutThreshold);
- }
- close(rs);
-
- if (deleteSet.size() <= 0) {
- return;
- }
-
- List<String> queries = new ArrayList<>();
-
- StringBuilder prefix = new StringBuilder();
- StringBuilder suffix = new StringBuilder();
-
- prefix.append("DELETE FROM \"COMPLETED_COMPACTIONS\" WHERE ");
-
- List<String> questions = new ArrayList<>(deleteSet.size());
- for (int i = 0; i < deleteSet.size(); i++) {
- questions.add("?");
- }
- List<Integer> counts = TxnUtils.buildQueryWithINClauseStrings(conf,
queries, prefix, suffix, questions,
- "\"CC_ID\"", false, false);
- int totalCount = 0;
- for (int i = 0; i < queries.size(); i++) {
- String query = queries.get(i);
- long insertCount = counts.get(i);
- LOG.debug("Going to execute update <{}>", query);
- pStmt = dbConn.prepareStatement(query);
- for (int j = 0; j < insertCount; j++) {
- pStmt.setLong(j + 1, deleteSet.get(totalCount + j));
- }
- totalCount += insertCount;
- int count = pStmt.executeUpdate();
- LOG.debug("Removed {} records from COMPLETED_COMPACTIONS", count);
- }
- dbConn.commit();
- } catch (SQLException e) {
- rollbackDBConn(dbConn);
- checkRetryable(e, "purgeCompactionHistory()");
- throw new MetaException(DB_FAILED_TO_CONNECT + e.getMessage());
- } finally {
- close(rs, stmt, dbConn);
- closeStmt(pStmt);
- }
- } catch (RetryException ex) {
- purgeCompactionHistory();
- }
- }
-
- /**
- * this ensures that the number of failed compaction entries retained is >
than number of failed
- * compaction threshold which prevents new compactions from being scheduled.
- */
- private int getFailedCompactionRetention() {
- int failedThreshold = MetastoreConf.getIntVar(conf,
ConfVars.COMPACTOR_INITIATOR_FAILED_THRESHOLD);
- int failedRetention = MetastoreConf.getIntVar(conf,
ConfVars.COMPACTOR_HISTORY_RETENTION_FAILED);
- if(failedRetention < failedThreshold) {
- LOG.warn("Invalid configuration {}={} < {}={}. Will use {}={}",
- ConfVars.COMPACTOR_INITIATOR_FAILED_THRESHOLD.getVarname(),
failedRetention,
- ConfVars.COMPACTOR_HISTORY_RETENTION_FAILED, failedRetention,
- ConfVars.COMPACTOR_INITIATOR_FAILED_THRESHOLD.getVarname(),
failedRetention);
- failedRetention = failedThreshold;
- }
- return failedRetention;
+ new PurgeCompactionHistoryFunction(conf).call(dataSourceWrapper);
}
/**
* Returns {@code true} if there already exists sufficient number of
consecutive failures for
* this table/partition so that no new automatic compactions will be
scheduled.
* User initiated compactions don't do this check.
- *
* Do we allow compacting whole table (when it's partitioned)? No, though
perhaps we should.
* That would be a meta operations, i.e. first find all partitions for this
table (which have
* txn info) and schedule each compaction separately. This avoids
complications in this logic.
*/
@Override
@RetrySemantics.ReadOnly
public boolean checkFailedCompactions(CompactionInfo ci) throws
MetaException {
- Connection dbConn = null;
- PreparedStatement pStmt = null;
- ResultSet rs = null;
- try {
- try {
- dbConn = getDbConn(Connection.TRANSACTION_READ_COMMITTED,
connPoolCompaction);
- pStmt = dbConn.prepareStatement("SELECT \"CC_STATE\",
\"CC_ENQUEUE_TIME\" FROM \"COMPLETED_COMPACTIONS\" WHERE " +
- "\"CC_DATABASE\" = ? AND " +
- "\"CC_TABLE\" = ? " +
- (ci.partName != null ? "AND \"CC_PARTITION\" = ?" : "") +
- " AND \"CC_STATE\" != " + quoteChar(DID_NOT_INITIATE) + " ORDER BY
\"CC_ID\" DESC");
- pStmt.setString(1, ci.dbname);
- pStmt.setString(2, ci.tableName);
- if (ci.partName != null) {
- pStmt.setString(3, ci.partName);
- }
- rs = pStmt.executeQuery();
- int numFailed = 0;
- int numTotal = 0;
- long lastEnqueueTime = -1;
- int failedThreshold = MetastoreConf.getIntVar(conf,
ConfVars.COMPACTOR_INITIATOR_FAILED_THRESHOLD);
- while(rs.next() && ++numTotal <= failedThreshold) {
- long enqueueTime = rs.getLong(2);
- if (enqueueTime > lastEnqueueTime) {
- lastEnqueueTime = enqueueTime;
- }
- if(rs.getString(1).charAt(0) == FAILED_STATE) {
- numFailed++;
- }
- else {
- numFailed--;
- }
- }
- // If the last attempt was too long ago, ignore the failed threshold
and try compaction again
- long retryTime = MetastoreConf.getTimeVar(conf,
- ConfVars.COMPACTOR_INITIATOR_FAILED_RETRY_TIME,
TimeUnit.MILLISECONDS);
- boolean needsRetry = (retryTime > 0) && (lastEnqueueTime + retryTime <
System.currentTimeMillis());
- return (numFailed == failedThreshold) && !needsRetry;
- }
- catch (SQLException e) {
- LOG.error("Unable to check for failed compactions", e);
- LOG.debug("Going to rollback");
- rollbackDBConn(dbConn);
- checkRetryable(e, "checkFailedCompactions(" + ci + ")");
- LOG.error(DB_FAILED_TO_CONNECT, e);
- return false;//weren't able to check
- } finally {
- close(rs, pStmt, dbConn);
- }
- } catch (RetryException e) {
- return checkFailedCompactions(ci);
- }
+ return new CheckFailedCompactionsHandler(conf,
ci).execute(dataSourceWrapper);
}
-
private void updateStatus(CompactionInfo ci) throws MetaException {
String strState = CompactionState.fromSqlConst(ci.state).toString();
+
LOG.debug("Marking as {}: CompactionInfo: {}", strState, ci);
- try {
- Connection dbConn = null;
- Statement stmt = null;
- PreparedStatement pStmt = null;
- ResultSet rs = null;
- try {
- dbConn = getDbConn(Connection.TRANSACTION_READ_COMMITTED,
connPoolCompaction);
- stmt = dbConn.createStatement();
- pStmt = dbConn.prepareStatement("SELECT \"CQ_ID\", \"CQ_DATABASE\",
\"CQ_TABLE\", \"CQ_PARTITION\", "
- + "\"CQ_STATE\", \"CQ_TYPE\", \"CQ_TBLPROPERTIES\",
\"CQ_WORKER_ID\", \"CQ_START\", \"CQ_RUN_AS\", "
- + "\"CQ_HIGHEST_WRITE_ID\", \"CQ_META_INFO\",
\"CQ_HADOOP_JOB_ID\", \"CQ_ERROR_MESSAGE\", "
- + "\"CQ_ENQUEUE_TIME\", \"CQ_WORKER_VERSION\",
\"CQ_INITIATOR_ID\", \"CQ_INITIATOR_VERSION\", "
- + "\"CQ_RETRY_RETENTION\", \"CQ_NEXT_TXN_ID\", \"CQ_TXN_ID\",
\"CQ_COMMIT_TIME\", \"CQ_POOL_NAME\", "
- + "\"CQ_NUMBER_OF_BUCKETS\", \"CQ_ORDER_BY\" FROM
\"COMPACTION_QUEUE\" WHERE \"CQ_ID\" = ?");
- pStmt.setLong(1, ci.id);
- rs = pStmt.executeQuery();
- if (rs.next()) {
- //preserve errorMessage and state
- String errorMessage = ci.errorMessage;
- char state = ci.state;
- ci = CompactionInfo.loadFullFromCompactionQueue(rs);
- ci.errorMessage = errorMessage;
- ci.state = state;
+ CompactionInfo ciActual = new GetCompactionInfoHandler(ci.id,
false).execute(dataSourceWrapper);
- pStmt = dbConn.prepareStatement(DELETE_CQ_ENTRIES);
- pStmt.setLong(1, ci.id);
- LOG.debug("Going to execute update <{}>", DELETE_CQ_ENTRIES);
- pStmt.executeUpdate();
- }
- else {
- if(ci.id > 0) {
- //the record with valid CQ_ID has disappeared - this is a sign of
something wrong
- throw new IllegalStateException("No record with CQ_ID=" + ci.id +
" found in COMPACTION_QUEUE");
- }
- }
- if(ci.id == 0) {
- //The failure occurred before we even made an entry in
COMPACTION_QUEUE
- //generate ID so that we can make an entry in COMPLETED_COMPACTIONS
- ci.id = generateCompactionQueueId(stmt);
- //this is not strictly accurate, but 'type' cannot be null.
- if(ci.type == null) {
- ci.type = CompactionType.MINOR;
- }
- ci.start = getDbTime(dbConn);
- LOG.debug("The failure occurred before we even made an entry in
COMPACTION_QUEUE. Generated ID so that we "
- + "can make an entry in COMPLETED_COMPACTIONS. New Id: {}",
ci.id);
- }
- close(rs, stmt, null);
- closeStmt(pStmt);
+ long endTime = getDbTime().getTime();
+ if (ciActual != null) {
+ //preserve errorMessage and state
+ ciActual.errorMessage = ci.errorMessage;
+ ciActual.state = ci.state;
- pStmt =
dbConn.prepareStatement(TxnQueries.INSERT_INTO_COMPLETED_COMPACTION);
- CompactionInfo.insertIntoCompletedCompactions(pStmt, ci,
getDbTime(dbConn));
- int updCount = pStmt.executeUpdate();
- LOG.debug("Inserted {} entries into COMPLETED_COMPACTIONS", updCount);
- closeStmt(pStmt);
- dbConn.commit();
- } catch (SQLException e) {
- LOG.error("Failed to mark compaction request as " + strState + ",
rolling back transaction: " + ci, e);
- rollbackDBConn(dbConn);
- checkRetryable(e, "updateStatus(" + ci + ")");
- } finally {
- close(rs, stmt, null);
- close(null, pStmt, dbConn);
+ dataSourceWrapper.getJdbcTemplate().update("DELETE FROM
\"COMPACTION_QUEUE\" WHERE \"CQ_ID\" = :id",
+ new MapSqlParameterSource("id", ci.id));
+ } else {
+ if (ci.id > 0) {
+ //the record with valid CQ_ID has disappeared - this is a sign of
something wrong
+ throw new IllegalStateException("No record with CQ_ID=" + ci.id + "
found in COMPACTION_QUEUE");
+ }
+ ciActual = ci;
+ }
+ if (ciActual.id == 0) {
+ //The failure occurred before we even made an entry in COMPACTION_QUEUE
+ //generate ID so that we can make an entry in COMPLETED_COMPACTIONS
+ ciActual.id = generateCompactionQueueId();
+ //this is not strictly accurate, but 'type' cannot be null.
+ if (ciActual.type == null) {
+ ciActual.type = CompactionType.MINOR;
}
- } catch (RetryException e) {
- updateStatus(ci);
+ //in case of creating a new entry start and end time will be the same
+ ciActual.start = endTime;
+ LOG.debug("The failure occurred before we even made an entry in
COMPACTION_QUEUE. Generated ID so that we "
+ + "can make an entry in COMPLETED_COMPACTIONS. New Id: {}",
ciActual.id);
}
+
+ new InsertCompactionInfoCommand(ciActual,
endTime).execute(dataSourceWrapper);
Review Comment:
not consistent naming, in some places handler in other command
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]