[ https://issues.apache.org/jira/browse/HIVE-26804?focusedWorklogId=841972&page=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-841972 ]
ASF GitHub Bot logged work on HIVE-26804: ----------------------------------------- Author: ASF GitHub Bot Created on: 27/Jan/23 10:30 Start Date: 27/Jan/23 10:30 Worklog Time Spent: 10m Work Description: rkirtir commented on code in PR #3880: URL: https://github.com/apache/hive/pull/3880#discussion_r1088805034 ########## standalone-metastore/metastore-server/src/main/java/org/apache/hadoop/hive/metastore/txn/TxnHandler.java: ########## @@ -6266,4 +6271,105 @@ public boolean isWrapperFor(Class<?> iface) throws SQLException { } } + @Override + @RetrySemantics.SafeToRetry + public AbortCompactResponse abortCompactions(AbortCompactionRequest reqst) throws MetaException, NoSuchCompactionException { + Map<Long, AbortCompactionResponseElement> abortCompactionResponseElements = new HashMap<>(); + AbortCompactResponse response = new AbortCompactResponse(new HashMap<>()); + response.setAbortedcompacts(abortCompactionResponseElements); + + List<Long> compactionIdsToAbort = reqst.getCompactionIds(); + if (compactionIdsToAbort.isEmpty()) { + LOG.info("Compaction ids are missing in request. No compactions to abort"); + throw new NoSuchCompactionException("Compaction ids missing in request. No compactions to abort"); + } + reqst.getCompactionIds().forEach(x -> { + abortCompactionResponseElements.put(x, new AbortCompactionResponseElement(x, "Error", + "No Such Compaction Id Available")); + }); + + List<CompactionInfo> eligibleCompactionsToAbort = findEligibleCompactionsToAbort(abortCompactionResponseElements, + compactionIdsToAbort); + for (int x = 0; x < eligibleCompactionsToAbort.size(); x++) { + abortCompactionResponseElements.put(eligibleCompactionsToAbort.get(x).id, abortCompaction(eligibleCompactionsToAbort.get(x))); + } + return response; + } + + @RetrySemantics.SafeToRetry + public AbortCompactionResponseElement abortCompaction(CompactionInfo compactionInfo) throws MetaException { + try { + try (Connection dbConn = getDbConn(Connection.TRANSACTION_READ_COMMITTED, connPoolMutex); + PreparedStatement pStmt = dbConn.prepareStatement(TxnQueries.INSERT_INTO_COMPLETED_COMPACTION)) { + compactionInfo.state = TxnStore.ABORTED_STATE; + compactionInfo.errorMessage = "Comapction Aborted by Abort Comapction request."; + CompactionInfo.insertIntoCompletedCompactions(pStmt, compactionInfo, getDbTime(dbConn)); + int updCount = pStmt.executeUpdate(); + if (updCount != 1) { + LOG.error("Unable to update compaction record: {}. updCnt={}", compactionInfo, updCount); + dbConn.rollback(); + return new AbortCompactionResponseElement(compactionInfo.id, + "Error", "Error while aborting compaction:Unable to update compaction record in COMPLETED_COMPACTIONS"); + } else { + LOG.debug("Inserted {} entries into COMPLETED_COMPACTIONS", updCount); + try (PreparedStatement stmt = dbConn.prepareStatement("DELETE FROM \"COMPACTION_QUEUE\" WHERE \"CQ_ID\" = ?")) { + stmt.setLong(1, compactionInfo.id); + LOG.debug("Going to execute update on COMPACTION_QUEUE "); + updCount = stmt.executeUpdate(); + if (updCount != 1) { + LOG.error("Unable to update compaction record: {}. updCnt={}", compactionInfo, updCount); + dbConn.rollback(); + return new AbortCompactionResponseElement(compactionInfo.id, + "Error", "Error while aborting compaction: Unable to update compaction record in COMPACTION_QUEUE"); + } else { + dbConn.commit(); + return new AbortCompactionResponseElement(compactionInfo.id, + "Success", "Successfully aborted compaction"); + } + } catch (SQLException e) { + dbConn.rollback(); + return new AbortCompactionResponseElement(compactionInfo.id, + "Error", "Error while aborting compaction:" + e.getMessage()); + } + } + } catch (SQLException e) { + LOG.error("Unable to connect to transaction database: " + e.getMessage()); + checkRetryable(e, "abortCompaction(" + compactionInfo + ")"); + return new AbortCompactionResponseElement(compactionInfo.id, + "Error", "Error while aborting compaction:" + e.getMessage()); + } + } catch (RetryException e) { + return abortCompaction(compactionInfo); + } + } + + private List<CompactionInfo> findEligibleCompactionsToAbort(Map<Long, + AbortCompactionResponseElement> abortCompactionResponseElements, List<Long> requestedCompId) throws MetaException { + + List<CompactionInfo> compactionInfoList = new ArrayList<>(); + String queryText = TxnQueries.SELECT_COMPACTION_QUEUE_BY_COMPID + " WHERE \"CC_ID\" IN (?) " ; + String sqlIN = requestedCompId.stream() + .map(x -> String.valueOf(x)) + .collect(Collectors.joining(",", "(", ")")); + queryText = queryText.replace("(?)", sqlIN); Review Comment: There was code smell warning before for dynamic formating of sql Issue Time Tracking ------------------- Worklog Id: (was: 841972) Time Spent: 4.5h (was: 4h 20m) > Cancel Compactions in initiated state > ------------------------------------- > > Key: HIVE-26804 > URL: https://issues.apache.org/jira/browse/HIVE-26804 > Project: Hive > Issue Type: New Feature > Components: Hive > Reporter: KIRTI RUGE > Assignee: KIRTI RUGE > Priority: Major > Labels: pull-request-available > Time Spent: 4.5h > Remaining Estimate: 0h > -- This message was sent by Atlassian Jira (v8.20.10#820010)