veghlaci05 commented on code in PR #3880:
URL: https://github.com/apache/hive/pull/3880#discussion_r1084128577


##########
standalone-metastore/metastore-server/src/main/java/org/apache/hadoop/hive/metastore/txn/TxnHandler.java:
##########
@@ -6266,4 +6271,110 @@ public boolean isWrapperFor(Class<?> iface) throws 
SQLException {
     }
   }
 
+  @Override
+  @RetrySemantics.SafeToRetry
+  public AbortCompactResponse abortCompactions(AbortCompactionRequest reqst) 
throws MetaException, NoSuchCompactionException {
+    Map<Long, AbortCompactionResponseElement> abortCompactionResponseElements 
= new HashMap<>();
+    AbortCompactResponse response = new AbortCompactResponse(new HashMap<>());
+    response.setAbortedcompacts(abortCompactionResponseElements);
+
+    List<Long> compactionIdsToAbort = reqst.getCompactionIds();
+    if (compactionIdsToAbort.isEmpty()) {
+      LOG.info("Compaction ids are missing in request. No compactions to 
abort");
+      throw new NoSuchCompactionException("Compaction ids missing in request. 
No compactions to abort");
+    }
+    reqst.getCompactionIds().forEach(x -> {
+      addAbortCompactionResponse(abortCompactionResponseElements,x,  "No Such 
Compaction Id Available","Error");
+    });
+
+    List<CompactionInfo> eligibleCompactionsToAbort = 
findEligibleCompactionsToAbort(abortCompactionResponseElements,compactionIdsToAbort);
+    for (int x = 0; x < eligibleCompactionsToAbort.size(); x++) {
+      abortCompaction(abortCompactionResponseElements, 
eligibleCompactionsToAbort.get(x));
+    }
+    return response;
+  }
+
+  @RetrySemantics.SafeToRetry
+  public void abortCompaction(Map<Long, AbortCompactionResponseElement> 
abortCompactionResponseElements,
+                              CompactionInfo compactionInfo) throws 
MetaException {
+    try {
+      try (Connection dbConn = 
getDbConn(Connection.TRANSACTION_READ_COMMITTED, connPoolMutex);
+           PreparedStatement pStmt = 
dbConn.prepareStatement(TxnQueries.INSERT_INTO_COMPLETED_COMPACTION)) {
+        compactionInfo.state = TxnStore.ABORTED_STATE;
+        compactionInfo.errorMessage = "Comapction Aborted by Abort Comapction 
request.";
+        CompactionInfo.insertIntoCompletedCompactions(pStmt, compactionInfo, 
getDbTime(dbConn));
+        int updCount = pStmt.executeUpdate();
+        if (updCount != 1) {
+          LOG.error("Unable to update compaction record: {}. updCnt={}", 
compactionInfo, updCount);
+          dbConn.rollback();
+          addAbortCompactionResponse(abortCompactionResponseElements, 
compactionInfo.id,
+                  "Error while aborting compaction:Unable to update compaction 
record in COMPLETED_COMPACTIONS", "Error");
+        } else {
+          LOG.debug("Inserted {} entries into COMPLETED_COMPACTIONS", 
updCount);
+          try (PreparedStatement stmt = dbConn.prepareStatement("DELETE FROM 
\"COMPACTION_QUEUE\" WHERE \"CQ_ID\" = ?")) {
+            stmt.setLong(1, compactionInfo.id);
+            LOG.debug("Going to execute update on COMPACTION_QUEUE ");
+            updCount = stmt.executeUpdate();
+            if (updCount != 1) {
+              LOG.error("Unable to update compaction record: {}. updCnt={}", 
compactionInfo, updCount);
+              dbConn.rollback();
+              addAbortCompactionResponse(abortCompactionResponseElements, 
compactionInfo.id,
+                      "Error while aborting compaction: Unable to update 
compaction record in COMPACTION_QUEUE", "Error");
+            } else {
+              dbConn.commit();
+              addAbortCompactionResponse(abortCompactionResponseElements, 
compactionInfo.id, "Successfully aborted compaction",
+                      "Success");
+            }
+          } catch (SQLException e) {
+            dbConn.rollback();
+            addAbortCompactionResponse(abortCompactionResponseElements, 
compactionInfo.id,
+                    "Error while aborting compaction:"+e.getMessage(), 
"Error");
+          }
+        }
+      } catch (SQLException e) {
+        LOG.error("Unable to connect to transaction database: " + 
e.getMessage());
+        checkRetryable(e, "abortCompaction(" + compactionInfo + ")");
+        addAbortCompactionResponse(abortCompactionResponseElements, 
compactionInfo.id,
+                "Error while aborting compaction:"+ e.getMessage(), "Error" );
+      }
+    } catch (RetryException e) {
+      abortCompaction(abortCompactionResponseElements, compactionInfo);
+    }
+  }
+
+  private List<CompactionInfo> findEligibleCompactionsToAbort(Map<Long,
+          AbortCompactionResponseElement> abortCompactionResponseElements, 
List<Long> requestedCompId) throws MetaException {
+
+    List<CompactionInfo> compactionInfoList = new ArrayList<>();
+    String queryText = TxnQueries.SELECT_COMPACTION_QUEUE_BY_COMPID + "  WHERE 
\"CC_ID\" IN " +
+            "(" + Joiner.on(',').join(requestedCompId) + ") ";
+    try (Connection dbConn = getDbConn(Connection.TRANSACTION_READ_COMMITTED);
+         PreparedStatement pStmt = dbConn.prepareStatement(queryText)) {
+      try (ResultSet rs = pStmt.executeQuery()) {
+        while (rs.next()) {
+          if (checkIfCompactionEligibleToAbort(rs.getString(5).charAt(0))) {
+            
compactionInfoList.add(CompactionInfo.loadFullFromCompactionQueue(rs));
+          } else {
+            addAbortCompactionResponse(abortCompactionResponseElements, 
rs.getLong(1),
+              "Error while aborting compaction as compaction is in state-" +
+                      CompactionState.fromSqlConst(rs.getString(5).charAt(0)), 
"Error");
+          }
+        }
+      }
+    } catch (SQLException e) {
+      throw new MetaException("Unable to select from transaction database-" + 
StringUtils.stringifyException(e));
+    }
+    return compactionInfoList;
+  }
+
+  private boolean checkIfCompactionEligibleToAbort(char state) {
+
+    return 
CompactionState.INITIATED.equals(CompactionState.fromSqlConst(state));
+  }

Review Comment:
   You could eliminate this method completely. It's a one liner and there's 
only one usage



##########
standalone-metastore/metastore-server/src/main/java/org/apache/hadoop/hive/metastore/txn/TxnHandler.java:
##########
@@ -6266,4 +6271,110 @@ public boolean isWrapperFor(Class<?> iface) throws 
SQLException {
     }
   }
 
+  @Override
+  @RetrySemantics.SafeToRetry
+  public AbortCompactResponse abortCompactions(AbortCompactionRequest reqst) 
throws MetaException, NoSuchCompactionException {
+    Map<Long, AbortCompactionResponseElement> abortCompactionResponseElements 
= new HashMap<>();
+    AbortCompactResponse response = new AbortCompactResponse(new HashMap<>());
+    response.setAbortedcompacts(abortCompactionResponseElements);
+
+    List<Long> compactionIdsToAbort = reqst.getCompactionIds();
+    if (compactionIdsToAbort.isEmpty()) {
+      LOG.info("Compaction ids are missing in request. No compactions to 
abort");
+      throw new NoSuchCompactionException("Compaction ids missing in request. 
No compactions to abort");
+    }
+    reqst.getCompactionIds().forEach(x -> {
+      addAbortCompactionResponse(abortCompactionResponseElements,x,  "No Such 
Compaction Id Available","Error");
+    });
+
+    List<CompactionInfo> eligibleCompactionsToAbort = 
findEligibleCompactionsToAbort(abortCompactionResponseElements,compactionIdsToAbort);
+    for (int x = 0; x < eligibleCompactionsToAbort.size(); x++) {
+      abortCompaction(abortCompactionResponseElements, 
eligibleCompactionsToAbort.get(x));
+    }
+    return response;
+  }
+
+  @RetrySemantics.SafeToRetry
+  public void abortCompaction(Map<Long, AbortCompactionResponseElement> 
abortCompactionResponseElements,
+                              CompactionInfo compactionInfo) throws 
MetaException {
+    try {
+      try (Connection dbConn = 
getDbConn(Connection.TRANSACTION_READ_COMMITTED, connPoolMutex);
+           PreparedStatement pStmt = 
dbConn.prepareStatement(TxnQueries.INSERT_INTO_COMPLETED_COMPACTION)) {
+        compactionInfo.state = TxnStore.ABORTED_STATE;
+        compactionInfo.errorMessage = "Comapction Aborted by Abort Comapction 
request.";
+        CompactionInfo.insertIntoCompletedCompactions(pStmt, compactionInfo, 
getDbTime(dbConn));
+        int updCount = pStmt.executeUpdate();
+        if (updCount != 1) {
+          LOG.error("Unable to update compaction record: {}. updCnt={}", 
compactionInfo, updCount);
+          dbConn.rollback();
+          addAbortCompactionResponse(abortCompactionResponseElements, 
compactionInfo.id,
+                  "Error while aborting compaction:Unable to update compaction 
record in COMPLETED_COMPACTIONS", "Error");
+        } else {
+          LOG.debug("Inserted {} entries into COMPLETED_COMPACTIONS", 
updCount);
+          try (PreparedStatement stmt = dbConn.prepareStatement("DELETE FROM 
\"COMPACTION_QUEUE\" WHERE \"CQ_ID\" = ?")) {
+            stmt.setLong(1, compactionInfo.id);
+            LOG.debug("Going to execute update on COMPACTION_QUEUE ");
+            updCount = stmt.executeUpdate();
+            if (updCount != 1) {
+              LOG.error("Unable to update compaction record: {}. updCnt={}", 
compactionInfo, updCount);
+              dbConn.rollback();
+              addAbortCompactionResponse(abortCompactionResponseElements, 
compactionInfo.id,
+                      "Error while aborting compaction: Unable to update 
compaction record in COMPACTION_QUEUE", "Error");
+            } else {
+              dbConn.commit();
+              addAbortCompactionResponse(abortCompactionResponseElements, 
compactionInfo.id, "Successfully aborted compaction",
+                      "Success");
+            }
+          } catch (SQLException e) {
+            dbConn.rollback();
+            addAbortCompactionResponse(abortCompactionResponseElements, 
compactionInfo.id,
+                    "Error while aborting compaction:"+e.getMessage(), 
"Error");
+          }
+        }
+      } catch (SQLException e) {
+        LOG.error("Unable to connect to transaction database: " + 
e.getMessage());
+        checkRetryable(e, "abortCompaction(" + compactionInfo + ")");
+        addAbortCompactionResponse(abortCompactionResponseElements, 
compactionInfo.id,
+                "Error while aborting compaction:"+ e.getMessage(), "Error" );
+      }
+    } catch (RetryException e) {
+      abortCompaction(abortCompactionResponseElements, compactionInfo);
+    }
+  }
+
+  private List<CompactionInfo> findEligibleCompactionsToAbort(Map<Long,
+          AbortCompactionResponseElement> abortCompactionResponseElements, 
List<Long> requestedCompId) throws MetaException {
+
+    List<CompactionInfo> compactionInfoList = new ArrayList<>();
+    String queryText = TxnQueries.SELECT_COMPACTION_QUEUE_BY_COMPID + "  WHERE 
\"CC_ID\" IN " +
+            "(" + Joiner.on(',').join(requestedCompId) + ") ";
+    try (Connection dbConn = getDbConn(Connection.TRANSACTION_READ_COMMITTED);
+         PreparedStatement pStmt = dbConn.prepareStatement(queryText)) {
+      try (ResultSet rs = pStmt.executeQuery()) {
+        while (rs.next()) {
+          if (checkIfCompactionEligibleToAbort(rs.getString(5).charAt(0))) {
+            
compactionInfoList.add(CompactionInfo.loadFullFromCompactionQueue(rs));
+          } else {
+            addAbortCompactionResponse(abortCompactionResponseElements, 
rs.getLong(1),
+              "Error while aborting compaction as compaction is in state-" +
+                      CompactionState.fromSqlConst(rs.getString(5).charAt(0)), 
"Error");
+          }
+        }
+      }
+    } catch (SQLException e) {
+      throw new MetaException("Unable to select from transaction database-" + 
StringUtils.stringifyException(e));
+    }
+    return compactionInfoList;
+  }
+
+  private boolean checkIfCompactionEligibleToAbort(char state) {
+
+    return 
CompactionState.INITIATED.equals(CompactionState.fromSqlConst(state));
+  }
+
+  private void addAbortCompactionResponse(Map<Long, 
AbortCompactionResponseElement> abortCompactionResponseElements,
+                                          long id, String message, String 
status) {
+
+    abortCompactionResponseElements.put(id, new 
AbortCompactionResponseElement(id, status, message));
+  }

Review Comment:
   This the recent changes this method become a one-liner. You could eliminate 
it completely



##########
ql/src/test/org/apache/hadoop/hive/ql/TestTxnCommands2.java:
##########
@@ -3418,6 +3422,100 @@ public void testShowCompactionOrder() throws Exception {
     Assert.assertEquals(TxnStore.REFUSED_RESPONSE, compacts.get(5).getState());
 
   }
+
+  @Test
+  public void testAbortCompaction() throws Exception {
+
+    d.destroy();
+    hiveConf.setVar(HiveConf.ConfVars.DYNAMICPARTITIONINGMODE, "nonstrict");
+    d = new Driver(hiveConf);
+    //generate some compaction history
+    runStatementOnDriver("drop database if exists mydb1 cascade");
+    runStatementOnDriver("create database mydb1");
+
+    runStatementOnDriver("create table mydb1.tbl0 " + "(a int, b int) 
partitioned by (p string) clustered by (a) into " +
+            BUCKET_COUNT + " buckets stored as orc TBLPROPERTIES 
('transactional'='true')");
+    runStatementOnDriver("insert into mydb1.tbl0" + " PARTITION(p) " +
+            " 
values(1,2,'p1'),(3,4,'p1'),(1,2,'p2'),(3,4,'p2'),(1,2,'p3'),(3,4,'p3')");
+    runStatementOnDriver("alter table mydb1.tbl0" + " PARTITION(p='p1') 
compact 'MAJOR'");
+    runStatementOnDriver("alter table mydb1.tbl0" + " PARTITION(p='p2') 
compact 'MAJOR'");
+    TestTxnCommands2.runWorker(hiveConf);
+    TestTxnCommands2.runCleaner(hiveConf);
+    runStatementOnDriver("alter table mydb1.tbl0" + " PARTITION(p='p3') 
compact 'MAJOR'");
+    runStatementOnDriver("insert into mydb1.tbl0" + " PARTITION(p) " +
+            " 
values(4,5,'p1'),(6,7,'p1'),(4,5,'p2'),(6,7,'p2'),(4,5,'p3'),(6,7,'p3')");
+    TestTxnCommands2.runWorker(hiveConf);
+    TestTxnCommands2.runCleaner(hiveConf);
+    runStatementOnDriver("insert into mydb1.tbl0" + " PARTITION(p) " +
+            " 
values(11,12,'p1'),(13,14,'p1'),(11,12,'p2'),(13,14,'p2'),(11,12,'p3'),(13,14,'p3')");
+    runStatementOnDriver("alter table mydb1.tbl0" + " PARTITION (p='p1')  
compact 'MINOR'");
+    TestTxnCommands2.runWorker(hiveConf);
+
+    runStatementOnDriver("create table mydb1.tbl1 " + "(a int, b int) 
partitioned by (ds string) clustered by (a) into " +
+            BUCKET_COUNT + " buckets stored as orc TBLPROPERTIES 
('transactional'='true')");
+    runStatementOnDriver("insert into mydb1.tbl1" + " PARTITION(ds) " +
+            " 
values(1,2,'today'),(3,4,'today'),(1,2,'tomorrow'),(3,4,'tomorrow'),(1,2,'yesterday'),(3,4,'yesterday')");
+    runStatementOnDriver("alter table mydb1.tbl1" + " PARTITION(ds='today') 
compact 'MAJOR'");
+    TestTxnCommands2.runWorker(hiveConf);
+
+    runStatementOnDriver("drop table if exists T1");
+    runStatementOnDriver("create table T1 (a int, b int) stored as orc 
TBLPROPERTIES ('transactional'='true')");
+    runStatementOnDriver("insert into T1 values(0,2)");//makes delta_1_1 in T1
+    runStatementOnDriver("insert into T1 values(1,4)");//makes delta_2_2 in T2
+
+    //create failed compaction attempt so that compactor txn is aborted
+    HiveConf.setBoolVar(hiveConf, 
HiveConf.ConfVars.HIVETESTMODEFAILCOMPACTION, true);
+    runStatementOnDriver("alter table T1 compact 'minor'");
+    TestTxnCommands2.runWorker(hiveConf);
+    // Verify  compaction order
+    List<ShowCompactResponseElement> compacts =
+            txnHandler.showCompact(new ShowCompactRequest()).getCompacts();
+    Assert.assertEquals(6, compacts.size());
+    Assert.assertEquals(TxnStore.INITIATED_RESPONSE, 
compacts.get(0).getState());
+    Assert.assertEquals(TxnStore.REFUSED_RESPONSE, compacts.get(1).getState());
+    Assert.assertEquals(TxnStore.CLEANING_RESPONSE, 
compacts.get(2).getState());
+    Assert.assertEquals(TxnStore.CLEANING_RESPONSE, 
compacts.get(3).getState());
+    Assert.assertEquals(TxnStore.SUCCEEDED_RESPONSE, 
compacts.get(4).getState());
+    Assert.assertEquals(TxnStore.REFUSED_RESPONSE, compacts.get(5).getState());
+
+    long initiatedStateCompId = compacts.get(0).getId();
+    List<Long> refusedStateCompIds = 
Arrays.asList(compacts.get(1).getId(),compacts.get(5).getId());
+    List<Long> compactionsToAbort = Arrays.asList(Long.parseLong("12"), 
compacts.get(0).getId(),
+            compacts.get(1).getId(), compacts.get(2).getId(), 
compacts.get(3).getId(), compacts.get(4).getId(),
+            compacts.get(5).getId());

Review Comment:
   You could use a single map here where the long is the id and the value is an 
AbortCompactionResponseElement with the expected id, status, and message. 
   
   ```
   
       Map<Long, AbortCompactionResponseElement> map = new HashMap<Long, 
AbortCompactionResponseElement>() {{
         put(compacts.get(0).getId(),new 
AbortCompactionResponseElement(compacts.get(0).getId(), "Suceess", 
"Successfully aborted compaction"));
   ...
   ...
       }};
   
   ```
   
   That would greatly simplify the assertions below.



##########
standalone-metastore/metastore-server/src/main/java/org/apache/hadoop/hive/metastore/txn/TxnHandler.java:
##########
@@ -6266,4 +6271,110 @@ public boolean isWrapperFor(Class<?> iface) throws 
SQLException {
     }
   }
 
+  @Override
+  @RetrySemantics.SafeToRetry
+  public AbortCompactResponse abortCompactions(AbortCompactionRequest reqst) 
throws MetaException, NoSuchCompactionException {
+    Map<Long, AbortCompactionResponseElement> abortCompactionResponseElements 
= new HashMap<>();
+    AbortCompactResponse response = new AbortCompactResponse(new HashMap<>());
+    response.setAbortedcompacts(abortCompactionResponseElements);
+
+    List<Long> compactionIdsToAbort = reqst.getCompactionIds();
+    if (compactionIdsToAbort.isEmpty()) {
+      LOG.info("Compaction ids are missing in request. No compactions to 
abort");
+      throw new NoSuchCompactionException("Compaction ids missing in request. 
No compactions to abort");
+    }
+    reqst.getCompactionIds().forEach(x -> {
+      addAbortCompactionResponse(abortCompactionResponseElements,x,  "No Such 
Compaction Id Available","Error");
+    });
+
+    List<CompactionInfo> eligibleCompactionsToAbort = 
findEligibleCompactionsToAbort(abortCompactionResponseElements,compactionIdsToAbort);
+    for (int x = 0; x < eligibleCompactionsToAbort.size(); x++) {
+      abortCompaction(abortCompactionResponseElements, 
eligibleCompactionsToAbort.get(x));
+    }
+    return response;
+  }
+
+  @RetrySemantics.SafeToRetry
+  public void abortCompaction(Map<Long, AbortCompactionResponseElement> 
abortCompactionResponseElements,

Review Comment:
   You could return with AbortCompactionResponseElement instead of passing the 
map



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to