[ 
https://issues.apache.org/jira/browse/HIVE-26804?focusedWorklogId=841972&page=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-841972
 ]

ASF GitHub Bot logged work on HIVE-26804:
-----------------------------------------

                Author: ASF GitHub Bot
            Created on: 27/Jan/23 10:30
            Start Date: 27/Jan/23 10:30
    Worklog Time Spent: 10m 
      Work Description: rkirtir commented on code in PR #3880:
URL: https://github.com/apache/hive/pull/3880#discussion_r1088805034


##########
standalone-metastore/metastore-server/src/main/java/org/apache/hadoop/hive/metastore/txn/TxnHandler.java:
##########
@@ -6266,4 +6271,105 @@ public boolean isWrapperFor(Class<?> iface) throws 
SQLException {
     }
   }
 
+  @Override
+  @RetrySemantics.SafeToRetry
+  public AbortCompactResponse abortCompactions(AbortCompactionRequest reqst) 
throws MetaException, NoSuchCompactionException {
+    Map<Long, AbortCompactionResponseElement> abortCompactionResponseElements 
= new HashMap<>();
+    AbortCompactResponse response = new AbortCompactResponse(new HashMap<>());
+    response.setAbortedcompacts(abortCompactionResponseElements);
+
+    List<Long> compactionIdsToAbort = reqst.getCompactionIds();
+    if (compactionIdsToAbort.isEmpty()) {
+      LOG.info("Compaction ids are missing in request. No compactions to 
abort");
+      throw new NoSuchCompactionException("Compaction ids missing in request. 
No compactions to abort");
+    }
+    reqst.getCompactionIds().forEach(x -> {
+      abortCompactionResponseElements.put(x, new 
AbortCompactionResponseElement(x, "Error",
+              "No Such Compaction Id Available"));
+    });
+
+    List<CompactionInfo> eligibleCompactionsToAbort = 
findEligibleCompactionsToAbort(abortCompactionResponseElements,
+            compactionIdsToAbort);
+    for (int x = 0; x < eligibleCompactionsToAbort.size(); x++) {
+      
abortCompactionResponseElements.put(eligibleCompactionsToAbort.get(x).id, 
abortCompaction(eligibleCompactionsToAbort.get(x)));
+    }
+    return response;
+  }
+
+  @RetrySemantics.SafeToRetry
+  public AbortCompactionResponseElement abortCompaction(CompactionInfo 
compactionInfo) throws MetaException {
+    try {
+      try (Connection dbConn = 
getDbConn(Connection.TRANSACTION_READ_COMMITTED, connPoolMutex);
+           PreparedStatement pStmt = 
dbConn.prepareStatement(TxnQueries.INSERT_INTO_COMPLETED_COMPACTION)) {
+        compactionInfo.state = TxnStore.ABORTED_STATE;
+        compactionInfo.errorMessage = "Comapction Aborted by Abort Comapction 
request.";
+        CompactionInfo.insertIntoCompletedCompactions(pStmt, compactionInfo, 
getDbTime(dbConn));
+        int updCount = pStmt.executeUpdate();
+        if (updCount != 1) {
+          LOG.error("Unable to update compaction record: {}. updCnt={}", 
compactionInfo, updCount);
+          dbConn.rollback();
+          return new AbortCompactionResponseElement(compactionInfo.id,
+                  "Error", "Error while aborting compaction:Unable to update 
compaction record in COMPLETED_COMPACTIONS");
+        } else {
+          LOG.debug("Inserted {} entries into COMPLETED_COMPACTIONS", 
updCount);
+          try (PreparedStatement stmt = dbConn.prepareStatement("DELETE FROM 
\"COMPACTION_QUEUE\" WHERE \"CQ_ID\" = ?")) {
+            stmt.setLong(1, compactionInfo.id);
+            LOG.debug("Going to execute update on COMPACTION_QUEUE ");
+            updCount = stmt.executeUpdate();
+            if (updCount != 1) {
+              LOG.error("Unable to update compaction record: {}. updCnt={}", 
compactionInfo, updCount);
+              dbConn.rollback();
+              return new AbortCompactionResponseElement(compactionInfo.id,
+                      "Error", "Error while aborting compaction: Unable to 
update compaction record in COMPACTION_QUEUE");
+            } else {
+              dbConn.commit();
+              return new AbortCompactionResponseElement(compactionInfo.id,
+                      "Success", "Successfully aborted compaction");
+            }
+          } catch (SQLException e) {
+            dbConn.rollback();
+            return new AbortCompactionResponseElement(compactionInfo.id,
+                    "Error", "Error while aborting compaction:" + 
e.getMessage());
+          }
+        }
+      } catch (SQLException e) {
+        LOG.error("Unable to connect to transaction database: " + 
e.getMessage());
+        checkRetryable(e, "abortCompaction(" + compactionInfo + ")");
+        return new AbortCompactionResponseElement(compactionInfo.id,
+                "Error", "Error while aborting compaction:" + e.getMessage());
+      }
+    } catch (RetryException e) {
+      return abortCompaction(compactionInfo);
+    }
+  }
+
+  private List<CompactionInfo> findEligibleCompactionsToAbort(Map<Long,
+          AbortCompactionResponseElement> abortCompactionResponseElements, 
List<Long> requestedCompId) throws MetaException {
+
+    List<CompactionInfo> compactionInfoList = new ArrayList<>();
+    String queryText = TxnQueries.SELECT_COMPACTION_QUEUE_BY_COMPID + "  WHERE 
\"CC_ID\" IN (?) " ;
+    String sqlIN = requestedCompId.stream()
+            .map(x -> String.valueOf(x))
+            .collect(Collectors.joining(",", "(", ")"));
+    queryText = queryText.replace("(?)", sqlIN);

Review Comment:
   There was code smell warning before for dynamic formating of sql





Issue Time Tracking
-------------------

    Worklog Id:     (was: 841972)
    Time Spent: 4.5h  (was: 4h 20m)

> Cancel Compactions in initiated state
> -------------------------------------
>
>                 Key: HIVE-26804
>                 URL: https://issues.apache.org/jira/browse/HIVE-26804
>             Project: Hive
>          Issue Type: New Feature
>          Components: Hive
>            Reporter: KIRTI RUGE
>            Assignee: KIRTI RUGE
>            Priority: Major
>              Labels: pull-request-available
>          Time Spent: 4.5h
>  Remaining Estimate: 0h
>




--
This message was sent by Atlassian Jira
(v8.20.10#820010)

Reply via email to