[ https://issues.apache.org/jira/browse/HIVE-26804?focusedWorklogId=841150&page=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-841150 ]
ASF GitHub Bot logged work on HIVE-26804: ----------------------------------------- Author: ASF GitHub Bot Created on: 23/Jan/23 15:02 Start Date: 23/Jan/23 15:02 Worklog Time Spent: 10m Work Description: veghlaci05 commented on code in PR #3880: URL: https://github.com/apache/hive/pull/3880#discussion_r1084128577 ########## standalone-metastore/metastore-server/src/main/java/org/apache/hadoop/hive/metastore/txn/TxnHandler.java: ########## @@ -6266,4 +6271,110 @@ public boolean isWrapperFor(Class<?> iface) throws SQLException { } } + @Override + @RetrySemantics.SafeToRetry + public AbortCompactResponse abortCompactions(AbortCompactionRequest reqst) throws MetaException, NoSuchCompactionException { + Map<Long, AbortCompactionResponseElement> abortCompactionResponseElements = new HashMap<>(); + AbortCompactResponse response = new AbortCompactResponse(new HashMap<>()); + response.setAbortedcompacts(abortCompactionResponseElements); + + List<Long> compactionIdsToAbort = reqst.getCompactionIds(); + if (compactionIdsToAbort.isEmpty()) { + LOG.info("Compaction ids are missing in request. No compactions to abort"); + throw new NoSuchCompactionException("Compaction ids missing in request. No compactions to abort"); + } + reqst.getCompactionIds().forEach(x -> { + addAbortCompactionResponse(abortCompactionResponseElements,x, "No Such Compaction Id Available","Error"); + }); + + List<CompactionInfo> eligibleCompactionsToAbort = findEligibleCompactionsToAbort(abortCompactionResponseElements,compactionIdsToAbort); + for (int x = 0; x < eligibleCompactionsToAbort.size(); x++) { + abortCompaction(abortCompactionResponseElements, eligibleCompactionsToAbort.get(x)); + } + return response; + } + + @RetrySemantics.SafeToRetry + public void abortCompaction(Map<Long, AbortCompactionResponseElement> abortCompactionResponseElements, + CompactionInfo compactionInfo) throws MetaException { + try { + try (Connection dbConn = getDbConn(Connection.TRANSACTION_READ_COMMITTED, connPoolMutex); + PreparedStatement pStmt = dbConn.prepareStatement(TxnQueries.INSERT_INTO_COMPLETED_COMPACTION)) { + compactionInfo.state = TxnStore.ABORTED_STATE; + compactionInfo.errorMessage = "Comapction Aborted by Abort Comapction request."; + CompactionInfo.insertIntoCompletedCompactions(pStmt, compactionInfo, getDbTime(dbConn)); + int updCount = pStmt.executeUpdate(); + if (updCount != 1) { + LOG.error("Unable to update compaction record: {}. updCnt={}", compactionInfo, updCount); + dbConn.rollback(); + addAbortCompactionResponse(abortCompactionResponseElements, compactionInfo.id, + "Error while aborting compaction:Unable to update compaction record in COMPLETED_COMPACTIONS", "Error"); + } else { + LOG.debug("Inserted {} entries into COMPLETED_COMPACTIONS", updCount); + try (PreparedStatement stmt = dbConn.prepareStatement("DELETE FROM \"COMPACTION_QUEUE\" WHERE \"CQ_ID\" = ?")) { + stmt.setLong(1, compactionInfo.id); + LOG.debug("Going to execute update on COMPACTION_QUEUE "); + updCount = stmt.executeUpdate(); + if (updCount != 1) { + LOG.error("Unable to update compaction record: {}. updCnt={}", compactionInfo, updCount); + dbConn.rollback(); + addAbortCompactionResponse(abortCompactionResponseElements, compactionInfo.id, + "Error while aborting compaction: Unable to update compaction record in COMPACTION_QUEUE", "Error"); + } else { + dbConn.commit(); + addAbortCompactionResponse(abortCompactionResponseElements, compactionInfo.id, "Successfully aborted compaction", + "Success"); + } + } catch (SQLException e) { + dbConn.rollback(); + addAbortCompactionResponse(abortCompactionResponseElements, compactionInfo.id, + "Error while aborting compaction:"+e.getMessage(), "Error"); + } + } + } catch (SQLException e) { + LOG.error("Unable to connect to transaction database: " + e.getMessage()); + checkRetryable(e, "abortCompaction(" + compactionInfo + ")"); + addAbortCompactionResponse(abortCompactionResponseElements, compactionInfo.id, + "Error while aborting compaction:"+ e.getMessage(), "Error" ); + } + } catch (RetryException e) { + abortCompaction(abortCompactionResponseElements, compactionInfo); + } + } + + private List<CompactionInfo> findEligibleCompactionsToAbort(Map<Long, + AbortCompactionResponseElement> abortCompactionResponseElements, List<Long> requestedCompId) throws MetaException { + + List<CompactionInfo> compactionInfoList = new ArrayList<>(); + String queryText = TxnQueries.SELECT_COMPACTION_QUEUE_BY_COMPID + " WHERE \"CC_ID\" IN " + + "(" + Joiner.on(',').join(requestedCompId) + ") "; + try (Connection dbConn = getDbConn(Connection.TRANSACTION_READ_COMMITTED); + PreparedStatement pStmt = dbConn.prepareStatement(queryText)) { + try (ResultSet rs = pStmt.executeQuery()) { + while (rs.next()) { + if (checkIfCompactionEligibleToAbort(rs.getString(5).charAt(0))) { + compactionInfoList.add(CompactionInfo.loadFullFromCompactionQueue(rs)); + } else { + addAbortCompactionResponse(abortCompactionResponseElements, rs.getLong(1), + "Error while aborting compaction as compaction is in state-" + + CompactionState.fromSqlConst(rs.getString(5).charAt(0)), "Error"); + } + } + } + } catch (SQLException e) { + throw new MetaException("Unable to select from transaction database-" + StringUtils.stringifyException(e)); + } + return compactionInfoList; + } + + private boolean checkIfCompactionEligibleToAbort(char state) { + + return CompactionState.INITIATED.equals(CompactionState.fromSqlConst(state)); + } Review Comment: You could eliminate this method completely. It's a one liner and there's only one usage ########## standalone-metastore/metastore-server/src/main/java/org/apache/hadoop/hive/metastore/txn/TxnHandler.java: ########## @@ -6266,4 +6271,110 @@ public boolean isWrapperFor(Class<?> iface) throws SQLException { } } + @Override + @RetrySemantics.SafeToRetry + public AbortCompactResponse abortCompactions(AbortCompactionRequest reqst) throws MetaException, NoSuchCompactionException { + Map<Long, AbortCompactionResponseElement> abortCompactionResponseElements = new HashMap<>(); + AbortCompactResponse response = new AbortCompactResponse(new HashMap<>()); + response.setAbortedcompacts(abortCompactionResponseElements); + + List<Long> compactionIdsToAbort = reqst.getCompactionIds(); + if (compactionIdsToAbort.isEmpty()) { + LOG.info("Compaction ids are missing in request. No compactions to abort"); + throw new NoSuchCompactionException("Compaction ids missing in request. No compactions to abort"); + } + reqst.getCompactionIds().forEach(x -> { + addAbortCompactionResponse(abortCompactionResponseElements,x, "No Such Compaction Id Available","Error"); + }); + + List<CompactionInfo> eligibleCompactionsToAbort = findEligibleCompactionsToAbort(abortCompactionResponseElements,compactionIdsToAbort); + for (int x = 0; x < eligibleCompactionsToAbort.size(); x++) { + abortCompaction(abortCompactionResponseElements, eligibleCompactionsToAbort.get(x)); + } + return response; + } + + @RetrySemantics.SafeToRetry + public void abortCompaction(Map<Long, AbortCompactionResponseElement> abortCompactionResponseElements, + CompactionInfo compactionInfo) throws MetaException { + try { + try (Connection dbConn = getDbConn(Connection.TRANSACTION_READ_COMMITTED, connPoolMutex); + PreparedStatement pStmt = dbConn.prepareStatement(TxnQueries.INSERT_INTO_COMPLETED_COMPACTION)) { + compactionInfo.state = TxnStore.ABORTED_STATE; + compactionInfo.errorMessage = "Comapction Aborted by Abort Comapction request."; + CompactionInfo.insertIntoCompletedCompactions(pStmt, compactionInfo, getDbTime(dbConn)); + int updCount = pStmt.executeUpdate(); + if (updCount != 1) { + LOG.error("Unable to update compaction record: {}. updCnt={}", compactionInfo, updCount); + dbConn.rollback(); + addAbortCompactionResponse(abortCompactionResponseElements, compactionInfo.id, + "Error while aborting compaction:Unable to update compaction record in COMPLETED_COMPACTIONS", "Error"); + } else { + LOG.debug("Inserted {} entries into COMPLETED_COMPACTIONS", updCount); + try (PreparedStatement stmt = dbConn.prepareStatement("DELETE FROM \"COMPACTION_QUEUE\" WHERE \"CQ_ID\" = ?")) { + stmt.setLong(1, compactionInfo.id); + LOG.debug("Going to execute update on COMPACTION_QUEUE "); + updCount = stmt.executeUpdate(); + if (updCount != 1) { + LOG.error("Unable to update compaction record: {}. updCnt={}", compactionInfo, updCount); + dbConn.rollback(); + addAbortCompactionResponse(abortCompactionResponseElements, compactionInfo.id, + "Error while aborting compaction: Unable to update compaction record in COMPACTION_QUEUE", "Error"); + } else { + dbConn.commit(); + addAbortCompactionResponse(abortCompactionResponseElements, compactionInfo.id, "Successfully aborted compaction", + "Success"); + } + } catch (SQLException e) { + dbConn.rollback(); + addAbortCompactionResponse(abortCompactionResponseElements, compactionInfo.id, + "Error while aborting compaction:"+e.getMessage(), "Error"); + } + } + } catch (SQLException e) { + LOG.error("Unable to connect to transaction database: " + e.getMessage()); + checkRetryable(e, "abortCompaction(" + compactionInfo + ")"); + addAbortCompactionResponse(abortCompactionResponseElements, compactionInfo.id, + "Error while aborting compaction:"+ e.getMessage(), "Error" ); + } + } catch (RetryException e) { + abortCompaction(abortCompactionResponseElements, compactionInfo); + } + } + + private List<CompactionInfo> findEligibleCompactionsToAbort(Map<Long, + AbortCompactionResponseElement> abortCompactionResponseElements, List<Long> requestedCompId) throws MetaException { + + List<CompactionInfo> compactionInfoList = new ArrayList<>(); + String queryText = TxnQueries.SELECT_COMPACTION_QUEUE_BY_COMPID + " WHERE \"CC_ID\" IN " + + "(" + Joiner.on(',').join(requestedCompId) + ") "; + try (Connection dbConn = getDbConn(Connection.TRANSACTION_READ_COMMITTED); + PreparedStatement pStmt = dbConn.prepareStatement(queryText)) { + try (ResultSet rs = pStmt.executeQuery()) { + while (rs.next()) { + if (checkIfCompactionEligibleToAbort(rs.getString(5).charAt(0))) { + compactionInfoList.add(CompactionInfo.loadFullFromCompactionQueue(rs)); + } else { + addAbortCompactionResponse(abortCompactionResponseElements, rs.getLong(1), + "Error while aborting compaction as compaction is in state-" + + CompactionState.fromSqlConst(rs.getString(5).charAt(0)), "Error"); + } + } + } + } catch (SQLException e) { + throw new MetaException("Unable to select from transaction database-" + StringUtils.stringifyException(e)); + } + return compactionInfoList; + } + + private boolean checkIfCompactionEligibleToAbort(char state) { + + return CompactionState.INITIATED.equals(CompactionState.fromSqlConst(state)); + } + + private void addAbortCompactionResponse(Map<Long, AbortCompactionResponseElement> abortCompactionResponseElements, + long id, String message, String status) { + + abortCompactionResponseElements.put(id, new AbortCompactionResponseElement(id, status, message)); + } Review Comment: This the recent changes this method become a one-liner. You could eliminate it completely ########## ql/src/test/org/apache/hadoop/hive/ql/TestTxnCommands2.java: ########## @@ -3418,6 +3422,100 @@ public void testShowCompactionOrder() throws Exception { Assert.assertEquals(TxnStore.REFUSED_RESPONSE, compacts.get(5).getState()); } + + @Test + public void testAbortCompaction() throws Exception { + + d.destroy(); + hiveConf.setVar(HiveConf.ConfVars.DYNAMICPARTITIONINGMODE, "nonstrict"); + d = new Driver(hiveConf); + //generate some compaction history + runStatementOnDriver("drop database if exists mydb1 cascade"); + runStatementOnDriver("create database mydb1"); + + runStatementOnDriver("create table mydb1.tbl0 " + "(a int, b int) partitioned by (p string) clustered by (a) into " + + BUCKET_COUNT + " buckets stored as orc TBLPROPERTIES ('transactional'='true')"); + runStatementOnDriver("insert into mydb1.tbl0" + " PARTITION(p) " + + " values(1,2,'p1'),(3,4,'p1'),(1,2,'p2'),(3,4,'p2'),(1,2,'p3'),(3,4,'p3')"); + runStatementOnDriver("alter table mydb1.tbl0" + " PARTITION(p='p1') compact 'MAJOR'"); + runStatementOnDriver("alter table mydb1.tbl0" + " PARTITION(p='p2') compact 'MAJOR'"); + TestTxnCommands2.runWorker(hiveConf); + TestTxnCommands2.runCleaner(hiveConf); + runStatementOnDriver("alter table mydb1.tbl0" + " PARTITION(p='p3') compact 'MAJOR'"); + runStatementOnDriver("insert into mydb1.tbl0" + " PARTITION(p) " + + " values(4,5,'p1'),(6,7,'p1'),(4,5,'p2'),(6,7,'p2'),(4,5,'p3'),(6,7,'p3')"); + TestTxnCommands2.runWorker(hiveConf); + TestTxnCommands2.runCleaner(hiveConf); + runStatementOnDriver("insert into mydb1.tbl0" + " PARTITION(p) " + + " values(11,12,'p1'),(13,14,'p1'),(11,12,'p2'),(13,14,'p2'),(11,12,'p3'),(13,14,'p3')"); + runStatementOnDriver("alter table mydb1.tbl0" + " PARTITION (p='p1') compact 'MINOR'"); + TestTxnCommands2.runWorker(hiveConf); + + runStatementOnDriver("create table mydb1.tbl1 " + "(a int, b int) partitioned by (ds string) clustered by (a) into " + + BUCKET_COUNT + " buckets stored as orc TBLPROPERTIES ('transactional'='true')"); + runStatementOnDriver("insert into mydb1.tbl1" + " PARTITION(ds) " + + " values(1,2,'today'),(3,4,'today'),(1,2,'tomorrow'),(3,4,'tomorrow'),(1,2,'yesterday'),(3,4,'yesterday')"); + runStatementOnDriver("alter table mydb1.tbl1" + " PARTITION(ds='today') compact 'MAJOR'"); + TestTxnCommands2.runWorker(hiveConf); + + runStatementOnDriver("drop table if exists T1"); + runStatementOnDriver("create table T1 (a int, b int) stored as orc TBLPROPERTIES ('transactional'='true')"); + runStatementOnDriver("insert into T1 values(0,2)");//makes delta_1_1 in T1 + runStatementOnDriver("insert into T1 values(1,4)");//makes delta_2_2 in T2 + + //create failed compaction attempt so that compactor txn is aborted + HiveConf.setBoolVar(hiveConf, HiveConf.ConfVars.HIVETESTMODEFAILCOMPACTION, true); + runStatementOnDriver("alter table T1 compact 'minor'"); + TestTxnCommands2.runWorker(hiveConf); + // Verify compaction order + List<ShowCompactResponseElement> compacts = + txnHandler.showCompact(new ShowCompactRequest()).getCompacts(); + Assert.assertEquals(6, compacts.size()); + Assert.assertEquals(TxnStore.INITIATED_RESPONSE, compacts.get(0).getState()); + Assert.assertEquals(TxnStore.REFUSED_RESPONSE, compacts.get(1).getState()); + Assert.assertEquals(TxnStore.CLEANING_RESPONSE, compacts.get(2).getState()); + Assert.assertEquals(TxnStore.CLEANING_RESPONSE, compacts.get(3).getState()); + Assert.assertEquals(TxnStore.SUCCEEDED_RESPONSE, compacts.get(4).getState()); + Assert.assertEquals(TxnStore.REFUSED_RESPONSE, compacts.get(5).getState()); + + long initiatedStateCompId = compacts.get(0).getId(); + List<Long> refusedStateCompIds = Arrays.asList(compacts.get(1).getId(),compacts.get(5).getId()); + List<Long> compactionsToAbort = Arrays.asList(Long.parseLong("12"), compacts.get(0).getId(), + compacts.get(1).getId(), compacts.get(2).getId(), compacts.get(3).getId(), compacts.get(4).getId(), + compacts.get(5).getId()); Review Comment: You could use a single map here where the long is the id and the value is an AbortCompactionResponseElement with the expected id, status, and message. ``` Map<Long, AbortCompactionResponseElement> map = new HashMap<Long, AbortCompactionResponseElement>() {{ put(compacts.get(0).getId(),new AbortCompactionResponseElement(compacts.get(0).getId(), "Suceess", "Successfully aborted compaction")); ... ... }}; ``` That would greatly simplify the assertions below. ########## standalone-metastore/metastore-server/src/main/java/org/apache/hadoop/hive/metastore/txn/TxnHandler.java: ########## @@ -6266,4 +6271,110 @@ public boolean isWrapperFor(Class<?> iface) throws SQLException { } } + @Override + @RetrySemantics.SafeToRetry + public AbortCompactResponse abortCompactions(AbortCompactionRequest reqst) throws MetaException, NoSuchCompactionException { + Map<Long, AbortCompactionResponseElement> abortCompactionResponseElements = new HashMap<>(); + AbortCompactResponse response = new AbortCompactResponse(new HashMap<>()); + response.setAbortedcompacts(abortCompactionResponseElements); + + List<Long> compactionIdsToAbort = reqst.getCompactionIds(); + if (compactionIdsToAbort.isEmpty()) { + LOG.info("Compaction ids are missing in request. No compactions to abort"); + throw new NoSuchCompactionException("Compaction ids missing in request. No compactions to abort"); + } + reqst.getCompactionIds().forEach(x -> { + addAbortCompactionResponse(abortCompactionResponseElements,x, "No Such Compaction Id Available","Error"); + }); + + List<CompactionInfo> eligibleCompactionsToAbort = findEligibleCompactionsToAbort(abortCompactionResponseElements,compactionIdsToAbort); + for (int x = 0; x < eligibleCompactionsToAbort.size(); x++) { + abortCompaction(abortCompactionResponseElements, eligibleCompactionsToAbort.get(x)); + } + return response; + } + + @RetrySemantics.SafeToRetry + public void abortCompaction(Map<Long, AbortCompactionResponseElement> abortCompactionResponseElements, Review Comment: You could return with AbortCompactionResponseElement instead of passing the map Issue Time Tracking ------------------- Worklog Id: (was: 841150) Time Spent: 3h 20m (was: 3h 10m) > Cancel Compactions in initiated state > ------------------------------------- > > Key: HIVE-26804 > URL: https://issues.apache.org/jira/browse/HIVE-26804 > Project: Hive > Issue Type: New Feature > Components: Hive > Reporter: KIRTI RUGE > Assignee: KIRTI RUGE > Priority: Major > Labels: pull-request-available > Time Spent: 3h 20m > Remaining Estimate: 0h > -- This message was sent by Atlassian Jira (v8.20.10#820010)