[ 
https://issues.apache.org/jira/browse/HIVE-26804?focusedWorklogId=839589&page=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-839589
 ]

ASF GitHub Bot logged work on HIVE-26804:
-----------------------------------------

                Author: ASF GitHub Bot
            Created on: 17/Jan/23 11:25
            Start Date: 17/Jan/23 11:25
    Worklog Time Spent: 10m 
      Work Description: veghlaci05 commented on code in PR #3880:
URL: https://github.com/apache/hive/pull/3880#discussion_r1072059615


##########
standalone-metastore/metastore-server/src/main/java/org/apache/hadoop/hive/metastore/txn/TxnHandler.java:
##########
@@ -6242,4 +6245,91 @@ public boolean isWrapperFor(Class<?> iface) throws 
SQLException {
     }
   }
 
+  @Override
+  @RetrySemantics.SafeToRetry
+  public AbortCompactResponse abortCompactions(AbortCompactionRequest reqst) 
throws MetaException, NoSuchCompactionException {
+
+    AbortCompactResponse response = new AbortCompactResponse(new HashMap<>());
+    response.setAbortedcompacts(abortCompactionResponseElements);
+    List<Long> compactionIdsToAbort = reqst.getCompactionIds();
+    if (compactionIdsToAbort.isEmpty()) {
+      LOG.info("Compaction ids are missing in request. No compactions to 
abort");
+      throw new NoSuchCompactionException("Compaction ids missing in request. 
No compactions to abort");
+    }
+    reqst.getCompactionIds().forEach(x -> {
+      abortCompactionResponseElements.put(x, new 
AbortCompactionResponseElement(x, "Error", "Not Eligible"));
+    });
+    List<CompactionInfo> eligibleCompactionsToAbort = 
findEligibleCompactionsToAbort(compactionIdsToAbort);
+    for (int x = 0; x < eligibleCompactionsToAbort.size(); x++) {
+      abortCompaction(eligibleCompactionsToAbort.get(x));
+    }
+    return response;
+  }
+
+  private void addAbortCompactionResponse(long id, String message, String 
status) {
+    abortCompactionResponseElements.put(id, new 
AbortCompactionResponseElement(id, status, message));
+  }
+
+  @RetrySemantics.SafeToRetry
+  public void abortCompaction(CompactionInfo compactionInfo) throws 
MetaException {
+    try {
+      try (Connection dbConn = 
getDbConn(Connection.TRANSACTION_READ_COMMITTED, connPoolMutex);
+           PreparedStatement pStmt = 
dbConn.prepareStatement(TxnQueries.INSERT_INTO_COMPLETED_COMPACTION)) {
+        CompactionInfo.insertIntoCompletedCompactions(pStmt, compactionInfo, 
getDbTime(dbConn));
+        int updCount = pStmt.executeUpdate();
+        if (updCount != 1) {
+          LOG.error("Unable to update compaction record: {}. updCnt={}", 
compactionInfo, updCount);
+          dbConn.rollback();

Review Comment:
   addAbortCompactionResponse() should be called here as well, stating that the 
compaction request could not be idnetified.



##########
standalone-metastore/metastore-server/src/main/java/org/apache/hadoop/hive/metastore/txn/TxnHandler.java:
##########
@@ -6242,4 +6245,91 @@ public boolean isWrapperFor(Class<?> iface) throws 
SQLException {
     }
   }
 
+  @Override
+  @RetrySemantics.SafeToRetry
+  public AbortCompactResponse abortCompactions(AbortCompactionRequest reqst) 
throws MetaException, NoSuchCompactionException {
+
+    AbortCompactResponse response = new AbortCompactResponse(new HashMap<>());
+    response.setAbortedcompacts(abortCompactionResponseElements);
+    List<Long> compactionIdsToAbort = reqst.getCompactionIds();
+    if (compactionIdsToAbort.isEmpty()) {
+      LOG.info("Compaction ids are missing in request. No compactions to 
abort");
+      throw new NoSuchCompactionException("Compaction ids missing in request. 
No compactions to abort");
+    }
+    reqst.getCompactionIds().forEach(x -> {
+      abortCompactionResponseElements.put(x, new 
AbortCompactionResponseElement(x, "Error", "Not Eligible"));
+    });
+    List<CompactionInfo> eligibleCompactionsToAbort = 
findEligibleCompactionsToAbort(compactionIdsToAbort);
+    for (int x = 0; x < eligibleCompactionsToAbort.size(); x++) {
+      abortCompaction(eligibleCompactionsToAbort.get(x));
+    }
+    return response;
+  }
+
+  private void addAbortCompactionResponse(long id, String message, String 
status) {
+    abortCompactionResponseElements.put(id, new 
AbortCompactionResponseElement(id, status, message));
+  }
+
+  @RetrySemantics.SafeToRetry
+  public void abortCompaction(CompactionInfo compactionInfo) throws 
MetaException {
+    try {
+      try (Connection dbConn = 
getDbConn(Connection.TRANSACTION_READ_COMMITTED, connPoolMutex);
+           PreparedStatement pStmt = 
dbConn.prepareStatement(TxnQueries.INSERT_INTO_COMPLETED_COMPACTION)) {
+        CompactionInfo.insertIntoCompletedCompactions(pStmt, compactionInfo, 
getDbTime(dbConn));
+        int updCount = pStmt.executeUpdate();
+        if (updCount != 1) {
+          LOG.error("Unable to update compaction record: {}. updCnt={}", 
compactionInfo, updCount);
+          dbConn.rollback();
+        } else {
+          LOG.debug("Inserted {} entries into COMPLETED_COMPACTIONS", 
updCount);
+          try (PreparedStatement stmt = dbConn.prepareStatement("DELETE FROM 
\"COMPACTION_QUEUE\" WHERE \"CQ_ID\" = ?")) {
+            stmt.setLong(1, compactionInfo.id);
+            LOG.debug("Going to execute update on COMPACTION_QUEUE ");
+            updCount = stmt.executeUpdate();
+            if (updCount != 1) {
+              LOG.error("Unable to update compaction record: {}. updCnt={}", 
compactionInfo, updCount);
+              dbConn.rollback();
+              addAbortCompactionResponse(compactionInfo.id, "Error while 
aborting compaction ", "Error");
+            } else {
+              dbConn.commit();
+              addAbortCompactionResponse(compactionInfo.id, "Successfully 
aborted compaction ", "Success");
+            }
+          } catch (SQLException e) {
+            dbConn.rollback();
+            addAbortCompactionResponse(compactionInfo.id, "Error while 
aborting compaction ", "Error");

Review Comment:
   The exception message should be added to the error as a reason. 



##########
standalone-metastore/metastore-server/src/main/java/org/apache/hadoop/hive/metastore/txn/TxnHandler.java:
##########
@@ -6242,4 +6245,91 @@ public boolean isWrapperFor(Class<?> iface) throws 
SQLException {
     }
   }
 
+  @Override
+  @RetrySemantics.SafeToRetry
+  public AbortCompactResponse abortCompactions(AbortCompactionRequest reqst) 
throws MetaException, NoSuchCompactionException {
+
+    AbortCompactResponse response = new AbortCompactResponse(new HashMap<>());
+    response.setAbortedcompacts(abortCompactionResponseElements);
+    List<Long> compactionIdsToAbort = reqst.getCompactionIds();
+    if (compactionIdsToAbort.isEmpty()) {
+      LOG.info("Compaction ids are missing in request. No compactions to 
abort");
+      throw new NoSuchCompactionException("Compaction ids missing in request. 
No compactions to abort");
+    }
+    reqst.getCompactionIds().forEach(x -> {
+      abortCompactionResponseElements.put(x, new 
AbortCompactionResponseElement(x, "Error", "Not Eligible"));
+    });
+    List<CompactionInfo> eligibleCompactionsToAbort = 
findEligibleCompactionsToAbort(compactionIdsToAbort);
+    for (int x = 0; x < eligibleCompactionsToAbort.size(); x++) {
+      abortCompaction(eligibleCompactionsToAbort.get(x));
+    }
+    return response;
+  }
+
+  private void addAbortCompactionResponse(long id, String message, String 
status) {
+    abortCompactionResponseElements.put(id, new 
AbortCompactionResponseElement(id, status, message));
+  }
+
+  @RetrySemantics.SafeToRetry
+  public void abortCompaction(CompactionInfo compactionInfo) throws 
MetaException {
+    try {
+      try (Connection dbConn = 
getDbConn(Connection.TRANSACTION_READ_COMMITTED, connPoolMutex);
+           PreparedStatement pStmt = 
dbConn.prepareStatement(TxnQueries.INSERT_INTO_COMPLETED_COMPACTION)) {
+        CompactionInfo.insertIntoCompletedCompactions(pStmt, compactionInfo, 
getDbTime(dbConn));
+        int updCount = pStmt.executeUpdate();
+        if (updCount != 1) {
+          LOG.error("Unable to update compaction record: {}. updCnt={}", 
compactionInfo, updCount);
+          dbConn.rollback();
+        } else {
+          LOG.debug("Inserted {} entries into COMPLETED_COMPACTIONS", 
updCount);
+          try (PreparedStatement stmt = dbConn.prepareStatement("DELETE FROM 
\"COMPACTION_QUEUE\" WHERE \"CQ_ID\" = ?")) {
+            stmt.setLong(1, compactionInfo.id);
+            LOG.debug("Going to execute update on COMPACTION_QUEUE ");
+            updCount = stmt.executeUpdate();
+            if (updCount != 1) {
+              LOG.error("Unable to update compaction record: {}. updCnt={}", 
compactionInfo, updCount);
+              dbConn.rollback();
+              addAbortCompactionResponse(compactionInfo.id, "Error while 
aborting compaction ", "Error");

Review Comment:
   The error message should be more specific. Like: The compaction request 
could not be deleted from the queue.



##########
standalone-metastore/metastore-server/src/main/java/org/apache/hadoop/hive/metastore/txn/TxnHandler.java:
##########
@@ -6242,4 +6245,91 @@ public boolean isWrapperFor(Class<?> iface) throws 
SQLException {
     }
   }
 
+  @Override
+  @RetrySemantics.SafeToRetry
+  public AbortCompactResponse abortCompactions(AbortCompactionRequest reqst) 
throws MetaException, NoSuchCompactionException {
+
+    AbortCompactResponse response = new AbortCompactResponse(new HashMap<>());
+    response.setAbortedcompacts(abortCompactionResponseElements);
+    List<Long> compactionIdsToAbort = reqst.getCompactionIds();
+    if (compactionIdsToAbort.isEmpty()) {
+      LOG.info("Compaction ids are missing in request. No compactions to 
abort");
+      throw new NoSuchCompactionException("Compaction ids missing in request. 
No compactions to abort");
+    }
+    reqst.getCompactionIds().forEach(x -> {
+      abortCompactionResponseElements.put(x, new 
AbortCompactionResponseElement(x, "Error", "Not Eligible"));
+    });
+    List<CompactionInfo> eligibleCompactionsToAbort = 
findEligibleCompactionsToAbort(compactionIdsToAbort);
+    for (int x = 0; x < eligibleCompactionsToAbort.size(); x++) {
+      abortCompaction(eligibleCompactionsToAbort.get(x));
+    }
+    return response;
+  }
+
+  private void addAbortCompactionResponse(long id, String message, String 
status) {
+    abortCompactionResponseElements.put(id, new 
AbortCompactionResponseElement(id, status, message));
+  }
+
+  @RetrySemantics.SafeToRetry
+  public void abortCompaction(CompactionInfo compactionInfo) throws 
MetaException {
+    try {
+      try (Connection dbConn = 
getDbConn(Connection.TRANSACTION_READ_COMMITTED, connPoolMutex);
+           PreparedStatement pStmt = 
dbConn.prepareStatement(TxnQueries.INSERT_INTO_COMPLETED_COMPACTION)) {
+        CompactionInfo.insertIntoCompletedCompactions(pStmt, compactionInfo, 
getDbTime(dbConn));
+        int updCount = pStmt.executeUpdate();
+        if (updCount != 1) {
+          LOG.error("Unable to update compaction record: {}. updCnt={}", 
compactionInfo, updCount);
+          dbConn.rollback();
+        } else {
+          LOG.debug("Inserted {} entries into COMPLETED_COMPACTIONS", 
updCount);
+          try (PreparedStatement stmt = dbConn.prepareStatement("DELETE FROM 
\"COMPACTION_QUEUE\" WHERE \"CQ_ID\" = ?")) {
+            stmt.setLong(1, compactionInfo.id);
+            LOG.debug("Going to execute update on COMPACTION_QUEUE ");
+            updCount = stmt.executeUpdate();
+            if (updCount != 1) {
+              LOG.error("Unable to update compaction record: {}. updCnt={}", 
compactionInfo, updCount);
+              dbConn.rollback();
+              addAbortCompactionResponse(compactionInfo.id, "Error while 
aborting compaction ", "Error");
+            } else {
+              dbConn.commit();
+              addAbortCompactionResponse(compactionInfo.id, "Successfully 
aborted compaction ", "Success");
+            }
+          } catch (SQLException e) {
+            dbConn.rollback();
+            addAbortCompactionResponse(compactionInfo.id, "Error while 
aborting compaction ", "Error");
+          }
+        }
+      } catch (SQLException e) {
+        LOG.error("Unable to connect to transaction database: " + 
e.getMessage());
+        checkRetryable(e, "abortCompaction(" + compactionInfo + ")");
+        addAbortCompactionResponse(compactionInfo.id, "Error while aborting 
compaction ", "Error");

Review Comment:
   The exception message should be added to the error as a reason.



##########
ql/src/test/org/apache/hadoop/hive/ql/TestTxnCommands.java:
##########
@@ -2485,6 +2486,83 @@ public void testShowCompactionFilterSortingAndLimit() 
throws Exception {
 
   }
 
+  @Test
+  public void testAbortCompactions() throws Exception {
+    //generate some compaction history
+    runStatementOnDriver("drop database if exists mydb1 cascade");
+    runStatementOnDriver("create database mydb1");
+    runStatementOnDriver("create table mydb1.tbl0 " + "(a int, b int) 
partitioned by (p string) clustered by (a) into " +
+            BUCKET_COUNT + " buckets stored as orc TBLPROPERTIES 
('transactional'='true')");
+    runStatementOnDriver("insert into mydb1.tbl0" + " PARTITION(p) " +
+            " 
values(1,2,'p1'),(3,4,'p1'),(1,2,'p2'),(3,4,'p2'),(1,2,'p3'),(3,4,'p3')");
+    runStatementOnDriver("alter table mydb1.tbl0" + " PARTITION(p='p1') 
compact 'MAJOR'");
+    TestTxnCommands2.runInitiator(hiveConf);
+    TestTxnCommands2.runWorker(hiveConf);
+    runStatementOnDriver("alter table mydb1.tbl0" + " PARTITION(p='p2') 
compact 'MAJOR'");
+    TestTxnCommands2.runWorker(hiveConf);
+    runStatementOnDriver("alter table mydb1.tbl0" + " PARTITION(p='p3') 
compact 'MAJOR'");
+    TestTxnCommands2.runWorker(hiveConf);
+    runStatementOnDriver("insert into mydb1.tbl0" + " PARTITION(p) " +
+            " 
values(4,5,'p1'),(6,7,'p1'),(4,5,'p2'),(6,7,'p2'),(4,5,'p3'),(6,7,'p3')");
+    runStatementOnDriver("alter table mydb1.tbl0" + " PARTITION (p='p1') 
compact 'MAJOR'");
+    TestTxnCommands2.runWorker(hiveConf);
+    runStatementOnDriver("alter table mydb1.tbl0" + " PARTITION (p='p2') 
compact 'MAJOR'");
+    TestTxnCommands2.runWorker(hiveConf);
+    runStatementOnDriver("alter table mydb1.tbl0" + " PARTITION (p='p3')  
compact 'MAJOR' pool 'pool0'");
+    TestTxnCommands2.runInitiator(hiveConf);
+    TestTxnCommands2.runWorker(hiveConf);
+    TxnStore txnHandler = TxnUtils.getTxnStore(hiveConf);
+
+
+
+    runStatementOnDriver("create table mydb1.tbl2 " + "(a int, b int) 
partitioned by (p string) clustered by (a) into " +
+            BUCKET_COUNT + " buckets stored as orc TBLPROPERTIES 
('transactional'='true')");
+    runStatementOnDriver("insert into mydb1.tbl2" + " PARTITION(p) " +
+            " 
values(1,2,'p1'),(3,4,'p1'),(1,2,'p2'),(3,4,'p2'),(1,2,'p3'),(3,4,'p3')");
+    runStatementOnDriver("alter table mydb1.tbl2" + " PARTITION(p='p1') 
compact 'MAJOR'");
+    runStatementOnDriver("alter table mydb1.tbl2" + " PARTITION(p='p2') 
compact 'MAJOR'");
+    TestTxnCommands2.runWorker(hiveConf);
+    TestTxnCommands2.runCleaner(hiveConf);
+    runStatementOnDriver("alter table mydb1.tbl2" + " PARTITION(p='p3') 
compact 'MAJOR'");
+    runStatementOnDriver("insert into mydb1.tbl0" + " PARTITION(p) " +
+            " 
values(4,5,'p1'),(6,7,'p1'),(4,5,'p2'),(6,7,'p2'),(4,5,'p3'),(6,7,'p3')");
+    TestTxnCommands2.runWorker(hiveConf);
+    TestTxnCommands2.runCleaner(hiveConf);
+    runStatementOnDriver("insert into mydb1.tbl2" + " PARTITION(p) " +
+            " 
values(11,12,'p1'),(13,14,'p1'),(11,12,'p2'),(13,14,'p2'),(11,12,'p3'),(13,14,'p3')");
+    runStatementOnDriver("alter table mydb1.tbl2" + " PARTITION (p='p1')  
compact 'MINOR'");
+    TestTxnCommands2.runWorker(hiveConf);
+
+    runStatementOnDriver("create table mydb1.tbl3 " + "(a int, b int) 
partitioned by (ds string) clustered by (a) into " +
+            BUCKET_COUNT + " buckets stored as orc TBLPROPERTIES 
('transactional'='true')");
+    runStatementOnDriver("insert into mydb1.tbl3" + " PARTITION(ds) " +
+            " 
values(1,2,'today'),(3,4,'today'),(1,2,'tomorrow'),(3,4,'tomorrow'),(1,2,'yesterday'),(3,4,'yesterday')");
+    runStatementOnDriver("alter table mydb1.tbl3" + " PARTITION(ds='today') 
compact 'MAJOR'");
+    TestTxnCommands2.runWorker(hiveConf);
+
+    SessionState.get().setCurrentDatabase("mydb1");
+
+    //testing show compaction command
+
+    List<String> r =  runStatementOnDriver("SHOW COMPACTIONS SCHEMA mydb1 
STATUS 'initiated'");
+    Assert.assertEquals(3,r.size());
+    List<String>compIdsToAbort = r.stream().skip(1).map(x -> 
x.split("\t")[0]).collect(Collectors.toList());
+    String abortCompactionCmd = "ABORT COMPACTIONS " 
+compIdsToAbort.get(0)+"\t"+compIdsToAbort.get(1);
+    r = runStatementOnDriver(abortCompactionCmd);
+    Assert.assertEquals(3,r.size());
+    Assert.assertEquals("CompactionId\tStatus\tMessage", r.get(0));
+    Assert.assertTrue(r.get(1).contains("Successfully Aborted Compaction"));
+    Assert.assertTrue(r.get(2).contains("Successfully Aborted Compaction"));
+
+    abortCompactionCmd = "ABORT COMPACTIONS " 
+compIdsToAbort.get(0)+"\t"+compIdsToAbort.get(1);
+    r = runStatementOnDriver(abortCompactionCmd);
+    Assert.assertEquals(3,r.size());
+    Assert.assertEquals("CompactionId\tStatus\tMessage", r.get(0));
+    Assert.assertTrue(r.get(1).contains("Error"));

Review Comment:
   Why not doing assertion for the Message and status?



##########
ql/src/test/org/apache/hadoop/hive/ql/TestTxnCommands.java:
##########
@@ -2485,6 +2486,83 @@ public void testShowCompactionFilterSortingAndLimit() 
throws Exception {
 
   }
 
+  @Test
+  public void testAbortCompactions() throws Exception {
+    //generate some compaction history
+    runStatementOnDriver("drop database if exists mydb1 cascade");
+    runStatementOnDriver("create database mydb1");
+    runStatementOnDriver("create table mydb1.tbl0 " + "(a int, b int) 
partitioned by (p string) clustered by (a) into " +
+            BUCKET_COUNT + " buckets stored as orc TBLPROPERTIES 
('transactional'='true')");
+    runStatementOnDriver("insert into mydb1.tbl0" + " PARTITION(p) " +
+            " 
values(1,2,'p1'),(3,4,'p1'),(1,2,'p2'),(3,4,'p2'),(1,2,'p3'),(3,4,'p3')");
+    runStatementOnDriver("alter table mydb1.tbl0" + " PARTITION(p='p1') 
compact 'MAJOR'");
+    TestTxnCommands2.runInitiator(hiveConf);
+    TestTxnCommands2.runWorker(hiveConf);
+    runStatementOnDriver("alter table mydb1.tbl0" + " PARTITION(p='p2') 
compact 'MAJOR'");
+    TestTxnCommands2.runWorker(hiveConf);
+    runStatementOnDriver("alter table mydb1.tbl0" + " PARTITION(p='p3') 
compact 'MAJOR'");
+    TestTxnCommands2.runWorker(hiveConf);
+    runStatementOnDriver("insert into mydb1.tbl0" + " PARTITION(p) " +
+            " 
values(4,5,'p1'),(6,7,'p1'),(4,5,'p2'),(6,7,'p2'),(4,5,'p3'),(6,7,'p3')");
+    runStatementOnDriver("alter table mydb1.tbl0" + " PARTITION (p='p1') 
compact 'MAJOR'");
+    TestTxnCommands2.runWorker(hiveConf);
+    runStatementOnDriver("alter table mydb1.tbl0" + " PARTITION (p='p2') 
compact 'MAJOR'");
+    TestTxnCommands2.runWorker(hiveConf);
+    runStatementOnDriver("alter table mydb1.tbl0" + " PARTITION (p='p3')  
compact 'MAJOR' pool 'pool0'");
+    TestTxnCommands2.runInitiator(hiveConf);
+    TestTxnCommands2.runWorker(hiveConf);
+    TxnStore txnHandler = TxnUtils.getTxnStore(hiveConf);
+
+
+
+    runStatementOnDriver("create table mydb1.tbl2 " + "(a int, b int) 
partitioned by (p string) clustered by (a) into " +
+            BUCKET_COUNT + " buckets stored as orc TBLPROPERTIES 
('transactional'='true')");
+    runStatementOnDriver("insert into mydb1.tbl2" + " PARTITION(p) " +
+            " 
values(1,2,'p1'),(3,4,'p1'),(1,2,'p2'),(3,4,'p2'),(1,2,'p3'),(3,4,'p3')");
+    runStatementOnDriver("alter table mydb1.tbl2" + " PARTITION(p='p1') 
compact 'MAJOR'");
+    runStatementOnDriver("alter table mydb1.tbl2" + " PARTITION(p='p2') 
compact 'MAJOR'");
+    TestTxnCommands2.runWorker(hiveConf);
+    TestTxnCommands2.runCleaner(hiveConf);
+    runStatementOnDriver("alter table mydb1.tbl2" + " PARTITION(p='p3') 
compact 'MAJOR'");
+    runStatementOnDriver("insert into mydb1.tbl0" + " PARTITION(p) " +
+            " 
values(4,5,'p1'),(6,7,'p1'),(4,5,'p2'),(6,7,'p2'),(4,5,'p3'),(6,7,'p3')");
+    TestTxnCommands2.runWorker(hiveConf);
+    TestTxnCommands2.runCleaner(hiveConf);
+    runStatementOnDriver("insert into mydb1.tbl2" + " PARTITION(p) " +
+            " 
values(11,12,'p1'),(13,14,'p1'),(11,12,'p2'),(13,14,'p2'),(11,12,'p3'),(13,14,'p3')");
+    runStatementOnDriver("alter table mydb1.tbl2" + " PARTITION (p='p1')  
compact 'MINOR'");
+    TestTxnCommands2.runWorker(hiveConf);
+
+    runStatementOnDriver("create table mydb1.tbl3 " + "(a int, b int) 
partitioned by (ds string) clustered by (a) into " +
+            BUCKET_COUNT + " buckets stored as orc TBLPROPERTIES 
('transactional'='true')");
+    runStatementOnDriver("insert into mydb1.tbl3" + " PARTITION(ds) " +
+            " 
values(1,2,'today'),(3,4,'today'),(1,2,'tomorrow'),(3,4,'tomorrow'),(1,2,'yesterday'),(3,4,'yesterday')");
+    runStatementOnDriver("alter table mydb1.tbl3" + " PARTITION(ds='today') 
compact 'MAJOR'");
+    TestTxnCommands2.runWorker(hiveConf);
+
+    SessionState.get().setCurrentDatabase("mydb1");
+
+    //testing show compaction command
+
+    List<String> r =  runStatementOnDriver("SHOW COMPACTIONS SCHEMA mydb1 
STATUS 'initiated'");
+    Assert.assertEquals(3,r.size());

Review Comment:
   You should also assert the state, so we can make sure the new statis set by 
the abort command.



##########
standalone-metastore/metastore-common/src/main/java/org/apache/hadoop/hive/metastore/txn/TxnQueries.java:
##########
@@ -50,4 +50,22 @@ public class TxnQueries {
     "  \"CC_HIGHEST_WRITE_ID\"" +
     "FROM " +
     "  \"COMPLETED_COMPACTIONS\" ) XX ";
+
+
+  public static final String SELECT_COMPACTION_QUEUE_BY_COMPID = "SELECT 
\"CQ_ID\", \"CQ_DATABASE\", \"CQ_TABLE\", \"CQ_PARTITION\", "

Review Comment:
   This is not used anymore



##########
ql/src/test/org/apache/hadoop/hive/ql/TestTxnCommands2.java:
##########
@@ -3411,6 +3415,73 @@ public void testShowCompactionOrder() throws Exception {
     Assert.assertEquals(TxnStore.REFUSED_RESPONSE, compacts.get(5).getState());
 
   }
+
+  @Test
+  public void testAbortCompaction() throws Exception {
+
+    d.destroy();
+    hiveConf.setVar(HiveConf.ConfVars.DYNAMICPARTITIONINGMODE, "nonstrict");
+    d = new Driver(hiveConf);
+    //generate some compaction history
+    runStatementOnDriver("drop database if exists mydb1 cascade");
+    runStatementOnDriver("create database mydb1");
+
+    runStatementOnDriver("create table mydb1.tbl0 " + "(a int, b int) 
partitioned by (p string) clustered by (a) into " +
+            BUCKET_COUNT + " buckets stored as orc TBLPROPERTIES 
('transactional'='true')");
+    runStatementOnDriver("insert into mydb1.tbl0" + " PARTITION(p) " +
+            " 
values(1,2,'p1'),(3,4,'p1'),(1,2,'p2'),(3,4,'p2'),(1,2,'p3'),(3,4,'p3')");
+    runStatementOnDriver("alter table mydb1.tbl0" + " PARTITION(p='p1') 
compact 'MAJOR'");
+    runStatementOnDriver("alter table mydb1.tbl0" + " PARTITION(p='p2') 
compact 'MAJOR'");
+    TestTxnCommands2.runWorker(hiveConf);
+    TestTxnCommands2.runCleaner(hiveConf);
+    runStatementOnDriver("alter table mydb1.tbl0" + " PARTITION(p='p3') 
compact 'MAJOR'");
+    runStatementOnDriver("insert into mydb1.tbl0" + " PARTITION(p) " +
+            " 
values(4,5,'p1'),(6,7,'p1'),(4,5,'p2'),(6,7,'p2'),(4,5,'p3'),(6,7,'p3')");
+    TestTxnCommands2.runWorker(hiveConf);
+    TestTxnCommands2.runCleaner(hiveConf);
+    runStatementOnDriver("insert into mydb1.tbl0" + " PARTITION(p) " +
+            " 
values(11,12,'p1'),(13,14,'p1'),(11,12,'p2'),(13,14,'p2'),(11,12,'p3'),(13,14,'p3')");
+    runStatementOnDriver("alter table mydb1.tbl0" + " PARTITION (p='p1')  
compact 'MINOR'");
+    TestTxnCommands2.runWorker(hiveConf);
+
+    runStatementOnDriver("create table mydb1.tbl1 " + "(a int, b int) 
partitioned by (ds string) clustered by (a) into " +
+            BUCKET_COUNT + " buckets stored as orc TBLPROPERTIES 
('transactional'='true')");
+    runStatementOnDriver("insert into mydb1.tbl1" + " PARTITION(ds) " +
+            " 
values(1,2,'today'),(3,4,'today'),(1,2,'tomorrow'),(3,4,'tomorrow'),(1,2,'yesterday'),(3,4,'yesterday')");
+    runStatementOnDriver("alter table mydb1.tbl1" + " PARTITION(ds='today') 
compact 'MAJOR'");
+    TestTxnCommands2.runWorker(hiveConf);
+
+    runStatementOnDriver("drop table if exists T1");
+    runStatementOnDriver("create table T1 (a int, b int) stored as orc 
TBLPROPERTIES ('transactional'='true')");
+    runStatementOnDriver("insert into T1 values(0,2)");//makes delta_1_1 in T1
+    runStatementOnDriver("insert into T1 values(1,4)");//makes delta_2_2 in T2
+
+    //create failed compaction attempt so that compactor txn is aborted
+    HiveConf.setBoolVar(hiveConf, 
HiveConf.ConfVars.HIVETESTMODEFAILCOMPACTION, true);
+    runStatementOnDriver("alter table T1 compact 'minor'");
+    TestTxnCommands2.runWorker(hiveConf);
+    // Verify  compaction order
+    List<ShowCompactResponseElement> compacts =
+            txnHandler.showCompact(new ShowCompactRequest()).getCompacts();
+    Assert.assertEquals(6, compacts.size());
+    Assert.assertEquals(TxnStore.INITIATED_RESPONSE, 
compacts.get(0).getState());
+    Assert.assertEquals(TxnStore.REFUSED_RESPONSE, compacts.get(1).getState());
+    Assert.assertEquals(TxnStore.CLEANING_RESPONSE, 
compacts.get(2).getState());
+    Assert.assertEquals(TxnStore.CLEANING_RESPONSE, 
compacts.get(3).getState());
+    Assert.assertEquals(TxnStore.SUCCEEDED_RESPONSE, 
compacts.get(4).getState());
+    Assert.assertEquals(TxnStore.REFUSED_RESPONSE, compacts.get(5).getState());
+
+    List<Long> compactionsToAbort =  
Arrays.asList(compacts.get(0).getId(),compacts.get(1).getId(),compacts.get(3).getId());
+    AbortCompactionRequest rqst=  new AbortCompactionRequest();
+    rqst.setCompactionIds(compactionsToAbort);
+    AbortCompactResponse resp = txnHandler.abortCompactions(rqst);
+    Assert.assertEquals(3,resp.getAbortedcompactsSize());
+    Map<Long,AbortCompactionResponseElement> res = resp.getAbortedcompacts();
+    List<AbortCompactionResponseElement> respList = 
res.values().stream().collect(Collectors.toList());
+    Assert.assertEquals("Not Eligible",respList.get(0).getMessage());
+    Assert.assertEquals("Not Eligible",respList.get(1).getMessage());
+    Assert.assertEquals("Successfully aborted 
Compaction",respList.get(2).getMessage());

Review Comment:
   Is this the first item in compacts list, but the order is different here? As 
far as I know only Initiated compaction can be aborted. Would be better to 
check for the ids too, just to make sure it's the same. 



##########
ql/src/test/org/apache/hadoop/hive/ql/TestTxnCommands2.java:
##########
@@ -3411,6 +3415,73 @@ public void testShowCompactionOrder() throws Exception {
     Assert.assertEquals(TxnStore.REFUSED_RESPONSE, compacts.get(5).getState());
 
   }
+
+  @Test
+  public void testAbortCompaction() throws Exception {
+
+    d.destroy();
+    hiveConf.setVar(HiveConf.ConfVars.DYNAMICPARTITIONINGMODE, "nonstrict");
+    d = new Driver(hiveConf);
+    //generate some compaction history
+    runStatementOnDriver("drop database if exists mydb1 cascade");
+    runStatementOnDriver("create database mydb1");
+
+    runStatementOnDriver("create table mydb1.tbl0 " + "(a int, b int) 
partitioned by (p string) clustered by (a) into " +
+            BUCKET_COUNT + " buckets stored as orc TBLPROPERTIES 
('transactional'='true')");
+    runStatementOnDriver("insert into mydb1.tbl0" + " PARTITION(p) " +
+            " 
values(1,2,'p1'),(3,4,'p1'),(1,2,'p2'),(3,4,'p2'),(1,2,'p3'),(3,4,'p3')");
+    runStatementOnDriver("alter table mydb1.tbl0" + " PARTITION(p='p1') 
compact 'MAJOR'");
+    runStatementOnDriver("alter table mydb1.tbl0" + " PARTITION(p='p2') 
compact 'MAJOR'");
+    TestTxnCommands2.runWorker(hiveConf);
+    TestTxnCommands2.runCleaner(hiveConf);
+    runStatementOnDriver("alter table mydb1.tbl0" + " PARTITION(p='p3') 
compact 'MAJOR'");
+    runStatementOnDriver("insert into mydb1.tbl0" + " PARTITION(p) " +
+            " 
values(4,5,'p1'),(6,7,'p1'),(4,5,'p2'),(6,7,'p2'),(4,5,'p3'),(6,7,'p3')");
+    TestTxnCommands2.runWorker(hiveConf);
+    TestTxnCommands2.runCleaner(hiveConf);
+    runStatementOnDriver("insert into mydb1.tbl0" + " PARTITION(p) " +
+            " 
values(11,12,'p1'),(13,14,'p1'),(11,12,'p2'),(13,14,'p2'),(11,12,'p3'),(13,14,'p3')");
+    runStatementOnDriver("alter table mydb1.tbl0" + " PARTITION (p='p1')  
compact 'MINOR'");
+    TestTxnCommands2.runWorker(hiveConf);
+
+    runStatementOnDriver("create table mydb1.tbl1 " + "(a int, b int) 
partitioned by (ds string) clustered by (a) into " +
+            BUCKET_COUNT + " buckets stored as orc TBLPROPERTIES 
('transactional'='true')");
+    runStatementOnDriver("insert into mydb1.tbl1" + " PARTITION(ds) " +
+            " 
values(1,2,'today'),(3,4,'today'),(1,2,'tomorrow'),(3,4,'tomorrow'),(1,2,'yesterday'),(3,4,'yesterday')");
+    runStatementOnDriver("alter table mydb1.tbl1" + " PARTITION(ds='today') 
compact 'MAJOR'");
+    TestTxnCommands2.runWorker(hiveConf);
+
+    runStatementOnDriver("drop table if exists T1");
+    runStatementOnDriver("create table T1 (a int, b int) stored as orc 
TBLPROPERTIES ('transactional'='true')");
+    runStatementOnDriver("insert into T1 values(0,2)");//makes delta_1_1 in T1
+    runStatementOnDriver("insert into T1 values(1,4)");//makes delta_2_2 in T2
+
+    //create failed compaction attempt so that compactor txn is aborted
+    HiveConf.setBoolVar(hiveConf, 
HiveConf.ConfVars.HIVETESTMODEFAILCOMPACTION, true);
+    runStatementOnDriver("alter table T1 compact 'minor'");
+    TestTxnCommands2.runWorker(hiveConf);
+    // Verify  compaction order
+    List<ShowCompactResponseElement> compacts =
+            txnHandler.showCompact(new ShowCompactRequest()).getCompacts();
+    Assert.assertEquals(6, compacts.size());
+    Assert.assertEquals(TxnStore.INITIATED_RESPONSE, 
compacts.get(0).getState());
+    Assert.assertEquals(TxnStore.REFUSED_RESPONSE, compacts.get(1).getState());
+    Assert.assertEquals(TxnStore.CLEANING_RESPONSE, 
compacts.get(2).getState());
+    Assert.assertEquals(TxnStore.CLEANING_RESPONSE, 
compacts.get(3).getState());
+    Assert.assertEquals(TxnStore.SUCCEEDED_RESPONSE, 
compacts.get(4).getState());
+    Assert.assertEquals(TxnStore.REFUSED_RESPONSE, compacts.get(5).getState());
+
+    List<Long> compactionsToAbort =  
Arrays.asList(compacts.get(0).getId(),compacts.get(1).getId(),compacts.get(3).getId());

Review Comment:
   Why not try to abort the 4th as well? Succeed compaction also should not be 
able to aborted. 



##########
standalone-metastore/metastore-server/src/main/java/org/apache/hadoop/hive/metastore/txn/TxnHandler.java:
##########
@@ -6242,4 +6245,91 @@ public boolean isWrapperFor(Class<?> iface) throws 
SQLException {
     }
   }
 
+  @Override
+  @RetrySemantics.SafeToRetry
+  public AbortCompactResponse abortCompactions(AbortCompactionRequest reqst) 
throws MetaException, NoSuchCompactionException {
+
+    AbortCompactResponse response = new AbortCompactResponse(new HashMap<>());
+    response.setAbortedcompacts(abortCompactionResponseElements);
+    List<Long> compactionIdsToAbort = reqst.getCompactionIds();
+    if (compactionIdsToAbort.isEmpty()) {
+      LOG.info("Compaction ids are missing in request. No compactions to 
abort");
+      throw new NoSuchCompactionException("Compaction ids missing in request. 
No compactions to abort");
+    }
+    reqst.getCompactionIds().forEach(x -> {
+      abortCompactionResponseElements.put(x, new 
AbortCompactionResponseElement(x, "Error", "Not Eligible"));

Review Comment:
   The default error message should be sth like : Compaction request could not 
be found. Also, you could iterate over the ids after getting the result of 
findEligibleCompactionsToAbort, and create the default response element only 
for the missing items to avoid creating them twice.



##########
standalone-metastore/metastore-server/src/main/java/org/apache/hadoop/hive/metastore/txn/TxnHandler.java:
##########
@@ -247,6 +248,8 @@ abstract class TxnHandler implements TxnStore, 
TxnStore.MutexAPI {
   private static final int ALLOWED_REPEATED_DEADLOCKS = 10;
   private static final Logger LOG = 
LoggerFactory.getLogger(TxnHandler.class.getName());
 
+  Map<Long, AbortCompactionResponseElement> abortCompactionResponseElements = 
new HashMap<>();

Review Comment:
   Why did you create this field? If the same TxnHandler is shared accross 
multiple threads (requests) this will become a serious issue. You should not 
store Txn scoped state in such fields.



##########
ql/src/test/org/apache/hadoop/hive/ql/TestTxnCommands.java:
##########
@@ -65,6 +65,7 @@
 import org.apache.hadoop.hive.metastore.api.ShowLocksResponse;
 import org.apache.hadoop.hive.metastore.api.TxnInfo;
 import org.apache.hadoop.hive.metastore.api.TxnState;
+import org.apache.hadoop.hive.metastore.api.AbortCompactResponse;

Review Comment:
   unused import



##########
ql/src/test/org/apache/hadoop/hive/ql/TestTxnCommands2.java:
##########
@@ -3411,6 +3415,73 @@ public void testShowCompactionOrder() throws Exception {
     Assert.assertEquals(TxnStore.REFUSED_RESPONSE, compacts.get(5).getState());
 
   }
+
+  @Test
+  public void testAbortCompaction() throws Exception {
+
+    d.destroy();
+    hiveConf.setVar(HiveConf.ConfVars.DYNAMICPARTITIONINGMODE, "nonstrict");
+    d = new Driver(hiveConf);
+    //generate some compaction history
+    runStatementOnDriver("drop database if exists mydb1 cascade");
+    runStatementOnDriver("create database mydb1");
+
+    runStatementOnDriver("create table mydb1.tbl0 " + "(a int, b int) 
partitioned by (p string) clustered by (a) into " +
+            BUCKET_COUNT + " buckets stored as orc TBLPROPERTIES 
('transactional'='true')");
+    runStatementOnDriver("insert into mydb1.tbl0" + " PARTITION(p) " +
+            " 
values(1,2,'p1'),(3,4,'p1'),(1,2,'p2'),(3,4,'p2'),(1,2,'p3'),(3,4,'p3')");
+    runStatementOnDriver("alter table mydb1.tbl0" + " PARTITION(p='p1') 
compact 'MAJOR'");
+    runStatementOnDriver("alter table mydb1.tbl0" + " PARTITION(p='p2') 
compact 'MAJOR'");
+    TestTxnCommands2.runWorker(hiveConf);
+    TestTxnCommands2.runCleaner(hiveConf);
+    runStatementOnDriver("alter table mydb1.tbl0" + " PARTITION(p='p3') 
compact 'MAJOR'");
+    runStatementOnDriver("insert into mydb1.tbl0" + " PARTITION(p) " +
+            " 
values(4,5,'p1'),(6,7,'p1'),(4,5,'p2'),(6,7,'p2'),(4,5,'p3'),(6,7,'p3')");
+    TestTxnCommands2.runWorker(hiveConf);
+    TestTxnCommands2.runCleaner(hiveConf);
+    runStatementOnDriver("insert into mydb1.tbl0" + " PARTITION(p) " +
+            " 
values(11,12,'p1'),(13,14,'p1'),(11,12,'p2'),(13,14,'p2'),(11,12,'p3'),(13,14,'p3')");
+    runStatementOnDriver("alter table mydb1.tbl0" + " PARTITION (p='p1')  
compact 'MINOR'");
+    TestTxnCommands2.runWorker(hiveConf);
+
+    runStatementOnDriver("create table mydb1.tbl1 " + "(a int, b int) 
partitioned by (ds string) clustered by (a) into " +
+            BUCKET_COUNT + " buckets stored as orc TBLPROPERTIES 
('transactional'='true')");
+    runStatementOnDriver("insert into mydb1.tbl1" + " PARTITION(ds) " +
+            " 
values(1,2,'today'),(3,4,'today'),(1,2,'tomorrow'),(3,4,'tomorrow'),(1,2,'yesterday'),(3,4,'yesterday')");
+    runStatementOnDriver("alter table mydb1.tbl1" + " PARTITION(ds='today') 
compact 'MAJOR'");
+    TestTxnCommands2.runWorker(hiveConf);
+
+    runStatementOnDriver("drop table if exists T1");
+    runStatementOnDriver("create table T1 (a int, b int) stored as orc 
TBLPROPERTIES ('transactional'='true')");
+    runStatementOnDriver("insert into T1 values(0,2)");//makes delta_1_1 in T1
+    runStatementOnDriver("insert into T1 values(1,4)");//makes delta_2_2 in T2
+
+    //create failed compaction attempt so that compactor txn is aborted
+    HiveConf.setBoolVar(hiveConf, 
HiveConf.ConfVars.HIVETESTMODEFAILCOMPACTION, true);
+    runStatementOnDriver("alter table T1 compact 'minor'");
+    TestTxnCommands2.runWorker(hiveConf);
+    // Verify  compaction order
+    List<ShowCompactResponseElement> compacts =
+            txnHandler.showCompact(new ShowCompactRequest()).getCompacts();
+    Assert.assertEquals(6, compacts.size());
+    Assert.assertEquals(TxnStore.INITIATED_RESPONSE, 
compacts.get(0).getState());
+    Assert.assertEquals(TxnStore.REFUSED_RESPONSE, compacts.get(1).getState());
+    Assert.assertEquals(TxnStore.CLEANING_RESPONSE, 
compacts.get(2).getState());
+    Assert.assertEquals(TxnStore.CLEANING_RESPONSE, 
compacts.get(3).getState());
+    Assert.assertEquals(TxnStore.SUCCEEDED_RESPONSE, 
compacts.get(4).getState());
+    Assert.assertEquals(TxnStore.REFUSED_RESPONSE, compacts.get(5).getState());
+
+    List<Long> compactionsToAbort =  
Arrays.asList(compacts.get(0).getId(),compacts.get(1).getId(),compacts.get(3).getId());
+    AbortCompactionRequest rqst=  new AbortCompactionRequest();
+    rqst.setCompactionIds(compactionsToAbort);
+    AbortCompactResponse resp = txnHandler.abortCompactions(rqst);
+    Assert.assertEquals(3,resp.getAbortedcompactsSize());
+    Map<Long,AbortCompactionResponseElement> res = resp.getAbortedcompacts();
+    List<AbortCompactionResponseElement> respList = 
res.values().stream().collect(Collectors.toList());
+    Assert.assertEquals("Not Eligible",respList.get(0).getMessage());
+    Assert.assertEquals("Not Eligible",respList.get(1).getMessage());
+    Assert.assertEquals("Successfully aborted 
Compaction",respList.get(2).getMessage());
+  }

Review Comment:
   nit: new line between methods





Issue Time Tracking
-------------------

    Worklog Id:     (was: 839589)
    Time Spent: 1h 40m  (was: 1.5h)

> Cancel Compactions in initiated state
> -------------------------------------
>
>                 Key: HIVE-26804
>                 URL: https://issues.apache.org/jira/browse/HIVE-26804
>             Project: Hive
>          Issue Type: New Feature
>          Components: Hive
>            Reporter: KIRTI RUGE
>            Assignee: KIRTI RUGE
>            Priority: Major
>              Labels: pull-request-available
>          Time Spent: 1h 40m
>  Remaining Estimate: 0h
>




--
This message was sent by Atlassian Jira
(v8.20.10#820010)

Reply via email to