[ 
https://issues.apache.org/jira/browse/HIVE-26804?focusedWorklogId=841150&page=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-841150
 ]

ASF GitHub Bot logged work on HIVE-26804:
-----------------------------------------

                Author: ASF GitHub Bot
            Created on: 23/Jan/23 15:02
            Start Date: 23/Jan/23 15:02
    Worklog Time Spent: 10m 
      Work Description: veghlaci05 commented on code in PR #3880:
URL: https://github.com/apache/hive/pull/3880#discussion_r1084128577


##########
standalone-metastore/metastore-server/src/main/java/org/apache/hadoop/hive/metastore/txn/TxnHandler.java:
##########
@@ -6266,4 +6271,110 @@ public boolean isWrapperFor(Class<?> iface) throws 
SQLException {
     }
   }
 
+  @Override
+  @RetrySemantics.SafeToRetry
+  public AbortCompactResponse abortCompactions(AbortCompactionRequest reqst) 
throws MetaException, NoSuchCompactionException {
+    Map<Long, AbortCompactionResponseElement> abortCompactionResponseElements 
= new HashMap<>();
+    AbortCompactResponse response = new AbortCompactResponse(new HashMap<>());
+    response.setAbortedcompacts(abortCompactionResponseElements);
+
+    List<Long> compactionIdsToAbort = reqst.getCompactionIds();
+    if (compactionIdsToAbort.isEmpty()) {
+      LOG.info("Compaction ids are missing in request. No compactions to 
abort");
+      throw new NoSuchCompactionException("Compaction ids missing in request. 
No compactions to abort");
+    }
+    reqst.getCompactionIds().forEach(x -> {
+      addAbortCompactionResponse(abortCompactionResponseElements,x,  "No Such 
Compaction Id Available","Error");
+    });
+
+    List<CompactionInfo> eligibleCompactionsToAbort = 
findEligibleCompactionsToAbort(abortCompactionResponseElements,compactionIdsToAbort);
+    for (int x = 0; x < eligibleCompactionsToAbort.size(); x++) {
+      abortCompaction(abortCompactionResponseElements, 
eligibleCompactionsToAbort.get(x));
+    }
+    return response;
+  }
+
+  @RetrySemantics.SafeToRetry
+  public void abortCompaction(Map<Long, AbortCompactionResponseElement> 
abortCompactionResponseElements,
+                              CompactionInfo compactionInfo) throws 
MetaException {
+    try {
+      try (Connection dbConn = 
getDbConn(Connection.TRANSACTION_READ_COMMITTED, connPoolMutex);
+           PreparedStatement pStmt = 
dbConn.prepareStatement(TxnQueries.INSERT_INTO_COMPLETED_COMPACTION)) {
+        compactionInfo.state = TxnStore.ABORTED_STATE;
+        compactionInfo.errorMessage = "Comapction Aborted by Abort Comapction 
request.";
+        CompactionInfo.insertIntoCompletedCompactions(pStmt, compactionInfo, 
getDbTime(dbConn));
+        int updCount = pStmt.executeUpdate();
+        if (updCount != 1) {
+          LOG.error("Unable to update compaction record: {}. updCnt={}", 
compactionInfo, updCount);
+          dbConn.rollback();
+          addAbortCompactionResponse(abortCompactionResponseElements, 
compactionInfo.id,
+                  "Error while aborting compaction:Unable to update compaction 
record in COMPLETED_COMPACTIONS", "Error");
+        } else {
+          LOG.debug("Inserted {} entries into COMPLETED_COMPACTIONS", 
updCount);
+          try (PreparedStatement stmt = dbConn.prepareStatement("DELETE FROM 
\"COMPACTION_QUEUE\" WHERE \"CQ_ID\" = ?")) {
+            stmt.setLong(1, compactionInfo.id);
+            LOG.debug("Going to execute update on COMPACTION_QUEUE ");
+            updCount = stmt.executeUpdate();
+            if (updCount != 1) {
+              LOG.error("Unable to update compaction record: {}. updCnt={}", 
compactionInfo, updCount);
+              dbConn.rollback();
+              addAbortCompactionResponse(abortCompactionResponseElements, 
compactionInfo.id,
+                      "Error while aborting compaction: Unable to update 
compaction record in COMPACTION_QUEUE", "Error");
+            } else {
+              dbConn.commit();
+              addAbortCompactionResponse(abortCompactionResponseElements, 
compactionInfo.id, "Successfully aborted compaction",
+                      "Success");
+            }
+          } catch (SQLException e) {
+            dbConn.rollback();
+            addAbortCompactionResponse(abortCompactionResponseElements, 
compactionInfo.id,
+                    "Error while aborting compaction:"+e.getMessage(), 
"Error");
+          }
+        }
+      } catch (SQLException e) {
+        LOG.error("Unable to connect to transaction database: " + 
e.getMessage());
+        checkRetryable(e, "abortCompaction(" + compactionInfo + ")");
+        addAbortCompactionResponse(abortCompactionResponseElements, 
compactionInfo.id,
+                "Error while aborting compaction:"+ e.getMessage(), "Error" );
+      }
+    } catch (RetryException e) {
+      abortCompaction(abortCompactionResponseElements, compactionInfo);
+    }
+  }
+
+  private List<CompactionInfo> findEligibleCompactionsToAbort(Map<Long,
+          AbortCompactionResponseElement> abortCompactionResponseElements, 
List<Long> requestedCompId) throws MetaException {
+
+    List<CompactionInfo> compactionInfoList = new ArrayList<>();
+    String queryText = TxnQueries.SELECT_COMPACTION_QUEUE_BY_COMPID + "  WHERE 
\"CC_ID\" IN " +
+            "(" + Joiner.on(',').join(requestedCompId) + ") ";
+    try (Connection dbConn = getDbConn(Connection.TRANSACTION_READ_COMMITTED);
+         PreparedStatement pStmt = dbConn.prepareStatement(queryText)) {
+      try (ResultSet rs = pStmt.executeQuery()) {
+        while (rs.next()) {
+          if (checkIfCompactionEligibleToAbort(rs.getString(5).charAt(0))) {
+            
compactionInfoList.add(CompactionInfo.loadFullFromCompactionQueue(rs));
+          } else {
+            addAbortCompactionResponse(abortCompactionResponseElements, 
rs.getLong(1),
+              "Error while aborting compaction as compaction is in state-" +
+                      CompactionState.fromSqlConst(rs.getString(5).charAt(0)), 
"Error");
+          }
+        }
+      }
+    } catch (SQLException e) {
+      throw new MetaException("Unable to select from transaction database-" + 
StringUtils.stringifyException(e));
+    }
+    return compactionInfoList;
+  }
+
+  private boolean checkIfCompactionEligibleToAbort(char state) {
+
+    return 
CompactionState.INITIATED.equals(CompactionState.fromSqlConst(state));
+  }

Review Comment:
   You could eliminate this method completely. It's a one liner and there's 
only one usage



##########
standalone-metastore/metastore-server/src/main/java/org/apache/hadoop/hive/metastore/txn/TxnHandler.java:
##########
@@ -6266,4 +6271,110 @@ public boolean isWrapperFor(Class<?> iface) throws 
SQLException {
     }
   }
 
+  @Override
+  @RetrySemantics.SafeToRetry
+  public AbortCompactResponse abortCompactions(AbortCompactionRequest reqst) 
throws MetaException, NoSuchCompactionException {
+    Map<Long, AbortCompactionResponseElement> abortCompactionResponseElements 
= new HashMap<>();
+    AbortCompactResponse response = new AbortCompactResponse(new HashMap<>());
+    response.setAbortedcompacts(abortCompactionResponseElements);
+
+    List<Long> compactionIdsToAbort = reqst.getCompactionIds();
+    if (compactionIdsToAbort.isEmpty()) {
+      LOG.info("Compaction ids are missing in request. No compactions to 
abort");
+      throw new NoSuchCompactionException("Compaction ids missing in request. 
No compactions to abort");
+    }
+    reqst.getCompactionIds().forEach(x -> {
+      addAbortCompactionResponse(abortCompactionResponseElements,x,  "No Such 
Compaction Id Available","Error");
+    });
+
+    List<CompactionInfo> eligibleCompactionsToAbort = 
findEligibleCompactionsToAbort(abortCompactionResponseElements,compactionIdsToAbort);
+    for (int x = 0; x < eligibleCompactionsToAbort.size(); x++) {
+      abortCompaction(abortCompactionResponseElements, 
eligibleCompactionsToAbort.get(x));
+    }
+    return response;
+  }
+
+  @RetrySemantics.SafeToRetry
+  public void abortCompaction(Map<Long, AbortCompactionResponseElement> 
abortCompactionResponseElements,
+                              CompactionInfo compactionInfo) throws 
MetaException {
+    try {
+      try (Connection dbConn = 
getDbConn(Connection.TRANSACTION_READ_COMMITTED, connPoolMutex);
+           PreparedStatement pStmt = 
dbConn.prepareStatement(TxnQueries.INSERT_INTO_COMPLETED_COMPACTION)) {
+        compactionInfo.state = TxnStore.ABORTED_STATE;
+        compactionInfo.errorMessage = "Comapction Aborted by Abort Comapction 
request.";
+        CompactionInfo.insertIntoCompletedCompactions(pStmt, compactionInfo, 
getDbTime(dbConn));
+        int updCount = pStmt.executeUpdate();
+        if (updCount != 1) {
+          LOG.error("Unable to update compaction record: {}. updCnt={}", 
compactionInfo, updCount);
+          dbConn.rollback();
+          addAbortCompactionResponse(abortCompactionResponseElements, 
compactionInfo.id,
+                  "Error while aborting compaction:Unable to update compaction 
record in COMPLETED_COMPACTIONS", "Error");
+        } else {
+          LOG.debug("Inserted {} entries into COMPLETED_COMPACTIONS", 
updCount);
+          try (PreparedStatement stmt = dbConn.prepareStatement("DELETE FROM 
\"COMPACTION_QUEUE\" WHERE \"CQ_ID\" = ?")) {
+            stmt.setLong(1, compactionInfo.id);
+            LOG.debug("Going to execute update on COMPACTION_QUEUE ");
+            updCount = stmt.executeUpdate();
+            if (updCount != 1) {
+              LOG.error("Unable to update compaction record: {}. updCnt={}", 
compactionInfo, updCount);
+              dbConn.rollback();
+              addAbortCompactionResponse(abortCompactionResponseElements, 
compactionInfo.id,
+                      "Error while aborting compaction: Unable to update 
compaction record in COMPACTION_QUEUE", "Error");
+            } else {
+              dbConn.commit();
+              addAbortCompactionResponse(abortCompactionResponseElements, 
compactionInfo.id, "Successfully aborted compaction",
+                      "Success");
+            }
+          } catch (SQLException e) {
+            dbConn.rollback();
+            addAbortCompactionResponse(abortCompactionResponseElements, 
compactionInfo.id,
+                    "Error while aborting compaction:"+e.getMessage(), 
"Error");
+          }
+        }
+      } catch (SQLException e) {
+        LOG.error("Unable to connect to transaction database: " + 
e.getMessage());
+        checkRetryable(e, "abortCompaction(" + compactionInfo + ")");
+        addAbortCompactionResponse(abortCompactionResponseElements, 
compactionInfo.id,
+                "Error while aborting compaction:"+ e.getMessage(), "Error" );
+      }
+    } catch (RetryException e) {
+      abortCompaction(abortCompactionResponseElements, compactionInfo);
+    }
+  }
+
+  private List<CompactionInfo> findEligibleCompactionsToAbort(Map<Long,
+          AbortCompactionResponseElement> abortCompactionResponseElements, 
List<Long> requestedCompId) throws MetaException {
+
+    List<CompactionInfo> compactionInfoList = new ArrayList<>();
+    String queryText = TxnQueries.SELECT_COMPACTION_QUEUE_BY_COMPID + "  WHERE 
\"CC_ID\" IN " +
+            "(" + Joiner.on(',').join(requestedCompId) + ") ";
+    try (Connection dbConn = getDbConn(Connection.TRANSACTION_READ_COMMITTED);
+         PreparedStatement pStmt = dbConn.prepareStatement(queryText)) {
+      try (ResultSet rs = pStmt.executeQuery()) {
+        while (rs.next()) {
+          if (checkIfCompactionEligibleToAbort(rs.getString(5).charAt(0))) {
+            
compactionInfoList.add(CompactionInfo.loadFullFromCompactionQueue(rs));
+          } else {
+            addAbortCompactionResponse(abortCompactionResponseElements, 
rs.getLong(1),
+              "Error while aborting compaction as compaction is in state-" +
+                      CompactionState.fromSqlConst(rs.getString(5).charAt(0)), 
"Error");
+          }
+        }
+      }
+    } catch (SQLException e) {
+      throw new MetaException("Unable to select from transaction database-" + 
StringUtils.stringifyException(e));
+    }
+    return compactionInfoList;
+  }
+
+  private boolean checkIfCompactionEligibleToAbort(char state) {
+
+    return 
CompactionState.INITIATED.equals(CompactionState.fromSqlConst(state));
+  }
+
+  private void addAbortCompactionResponse(Map<Long, 
AbortCompactionResponseElement> abortCompactionResponseElements,
+                                          long id, String message, String 
status) {
+
+    abortCompactionResponseElements.put(id, new 
AbortCompactionResponseElement(id, status, message));
+  }

Review Comment:
   This the recent changes this method become a one-liner. You could eliminate 
it completely



##########
ql/src/test/org/apache/hadoop/hive/ql/TestTxnCommands2.java:
##########
@@ -3418,6 +3422,100 @@ public void testShowCompactionOrder() throws Exception {
     Assert.assertEquals(TxnStore.REFUSED_RESPONSE, compacts.get(5).getState());
 
   }
+
+  @Test
+  public void testAbortCompaction() throws Exception {
+
+    d.destroy();
+    hiveConf.setVar(HiveConf.ConfVars.DYNAMICPARTITIONINGMODE, "nonstrict");
+    d = new Driver(hiveConf);
+    //generate some compaction history
+    runStatementOnDriver("drop database if exists mydb1 cascade");
+    runStatementOnDriver("create database mydb1");
+
+    runStatementOnDriver("create table mydb1.tbl0 " + "(a int, b int) 
partitioned by (p string) clustered by (a) into " +
+            BUCKET_COUNT + " buckets stored as orc TBLPROPERTIES 
('transactional'='true')");
+    runStatementOnDriver("insert into mydb1.tbl0" + " PARTITION(p) " +
+            " 
values(1,2,'p1'),(3,4,'p1'),(1,2,'p2'),(3,4,'p2'),(1,2,'p3'),(3,4,'p3')");
+    runStatementOnDriver("alter table mydb1.tbl0" + " PARTITION(p='p1') 
compact 'MAJOR'");
+    runStatementOnDriver("alter table mydb1.tbl0" + " PARTITION(p='p2') 
compact 'MAJOR'");
+    TestTxnCommands2.runWorker(hiveConf);
+    TestTxnCommands2.runCleaner(hiveConf);
+    runStatementOnDriver("alter table mydb1.tbl0" + " PARTITION(p='p3') 
compact 'MAJOR'");
+    runStatementOnDriver("insert into mydb1.tbl0" + " PARTITION(p) " +
+            " 
values(4,5,'p1'),(6,7,'p1'),(4,5,'p2'),(6,7,'p2'),(4,5,'p3'),(6,7,'p3')");
+    TestTxnCommands2.runWorker(hiveConf);
+    TestTxnCommands2.runCleaner(hiveConf);
+    runStatementOnDriver("insert into mydb1.tbl0" + " PARTITION(p) " +
+            " 
values(11,12,'p1'),(13,14,'p1'),(11,12,'p2'),(13,14,'p2'),(11,12,'p3'),(13,14,'p3')");
+    runStatementOnDriver("alter table mydb1.tbl0" + " PARTITION (p='p1')  
compact 'MINOR'");
+    TestTxnCommands2.runWorker(hiveConf);
+
+    runStatementOnDriver("create table mydb1.tbl1 " + "(a int, b int) 
partitioned by (ds string) clustered by (a) into " +
+            BUCKET_COUNT + " buckets stored as orc TBLPROPERTIES 
('transactional'='true')");
+    runStatementOnDriver("insert into mydb1.tbl1" + " PARTITION(ds) " +
+            " 
values(1,2,'today'),(3,4,'today'),(1,2,'tomorrow'),(3,4,'tomorrow'),(1,2,'yesterday'),(3,4,'yesterday')");
+    runStatementOnDriver("alter table mydb1.tbl1" + " PARTITION(ds='today') 
compact 'MAJOR'");
+    TestTxnCommands2.runWorker(hiveConf);
+
+    runStatementOnDriver("drop table if exists T1");
+    runStatementOnDriver("create table T1 (a int, b int) stored as orc 
TBLPROPERTIES ('transactional'='true')");
+    runStatementOnDriver("insert into T1 values(0,2)");//makes delta_1_1 in T1
+    runStatementOnDriver("insert into T1 values(1,4)");//makes delta_2_2 in T2
+
+    //create failed compaction attempt so that compactor txn is aborted
+    HiveConf.setBoolVar(hiveConf, 
HiveConf.ConfVars.HIVETESTMODEFAILCOMPACTION, true);
+    runStatementOnDriver("alter table T1 compact 'minor'");
+    TestTxnCommands2.runWorker(hiveConf);
+    // Verify  compaction order
+    List<ShowCompactResponseElement> compacts =
+            txnHandler.showCompact(new ShowCompactRequest()).getCompacts();
+    Assert.assertEquals(6, compacts.size());
+    Assert.assertEquals(TxnStore.INITIATED_RESPONSE, 
compacts.get(0).getState());
+    Assert.assertEquals(TxnStore.REFUSED_RESPONSE, compacts.get(1).getState());
+    Assert.assertEquals(TxnStore.CLEANING_RESPONSE, 
compacts.get(2).getState());
+    Assert.assertEquals(TxnStore.CLEANING_RESPONSE, 
compacts.get(3).getState());
+    Assert.assertEquals(TxnStore.SUCCEEDED_RESPONSE, 
compacts.get(4).getState());
+    Assert.assertEquals(TxnStore.REFUSED_RESPONSE, compacts.get(5).getState());
+
+    long initiatedStateCompId = compacts.get(0).getId();
+    List<Long> refusedStateCompIds = 
Arrays.asList(compacts.get(1).getId(),compacts.get(5).getId());
+    List<Long> compactionsToAbort = Arrays.asList(Long.parseLong("12"), 
compacts.get(0).getId(),
+            compacts.get(1).getId(), compacts.get(2).getId(), 
compacts.get(3).getId(), compacts.get(4).getId(),
+            compacts.get(5).getId());

Review Comment:
   You could use a single map here where the long is the id and the value is an 
AbortCompactionResponseElement with the expected id, status, and message. 
   
   ```
   
       Map<Long, AbortCompactionResponseElement> map = new HashMap<Long, 
AbortCompactionResponseElement>() {{
         put(compacts.get(0).getId(),new 
AbortCompactionResponseElement(compacts.get(0).getId(), "Suceess", 
"Successfully aborted compaction"));
   ...
   ...
       }};
   
   ```
   
   That would greatly simplify the assertions below.



##########
standalone-metastore/metastore-server/src/main/java/org/apache/hadoop/hive/metastore/txn/TxnHandler.java:
##########
@@ -6266,4 +6271,110 @@ public boolean isWrapperFor(Class<?> iface) throws 
SQLException {
     }
   }
 
+  @Override
+  @RetrySemantics.SafeToRetry
+  public AbortCompactResponse abortCompactions(AbortCompactionRequest reqst) 
throws MetaException, NoSuchCompactionException {
+    Map<Long, AbortCompactionResponseElement> abortCompactionResponseElements 
= new HashMap<>();
+    AbortCompactResponse response = new AbortCompactResponse(new HashMap<>());
+    response.setAbortedcompacts(abortCompactionResponseElements);
+
+    List<Long> compactionIdsToAbort = reqst.getCompactionIds();
+    if (compactionIdsToAbort.isEmpty()) {
+      LOG.info("Compaction ids are missing in request. No compactions to 
abort");
+      throw new NoSuchCompactionException("Compaction ids missing in request. 
No compactions to abort");
+    }
+    reqst.getCompactionIds().forEach(x -> {
+      addAbortCompactionResponse(abortCompactionResponseElements,x,  "No Such 
Compaction Id Available","Error");
+    });
+
+    List<CompactionInfo> eligibleCompactionsToAbort = 
findEligibleCompactionsToAbort(abortCompactionResponseElements,compactionIdsToAbort);
+    for (int x = 0; x < eligibleCompactionsToAbort.size(); x++) {
+      abortCompaction(abortCompactionResponseElements, 
eligibleCompactionsToAbort.get(x));
+    }
+    return response;
+  }
+
+  @RetrySemantics.SafeToRetry
+  public void abortCompaction(Map<Long, AbortCompactionResponseElement> 
abortCompactionResponseElements,

Review Comment:
   You could return with AbortCompactionResponseElement instead of passing the 
map





Issue Time Tracking
-------------------

    Worklog Id:     (was: 841150)
    Time Spent: 3h 20m  (was: 3h 10m)

> Cancel Compactions in initiated state
> -------------------------------------
>
>                 Key: HIVE-26804
>                 URL: https://issues.apache.org/jira/browse/HIVE-26804
>             Project: Hive
>          Issue Type: New Feature
>          Components: Hive
>            Reporter: KIRTI RUGE
>            Assignee: KIRTI RUGE
>            Priority: Major
>              Labels: pull-request-available
>          Time Spent: 3h 20m
>  Remaining Estimate: 0h
>




--
This message was sent by Atlassian Jira
(v8.20.10#820010)

Reply via email to