[ 
https://issues.apache.org/jira/browse/HIVE-26804?focusedWorklogId=836397&page=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-836397
 ]

ASF GitHub Bot logged work on HIVE-26804:
-----------------------------------------

                Author: ASF GitHub Bot
            Created on: 02/Jan/23 13:07
            Start Date: 02/Jan/23 13:07
    Worklog Time Spent: 10m 
      Work Description: veghlaci05 commented on code in PR #3880:
URL: https://github.com/apache/hive/pull/3880#discussion_r1060001094


##########
ql/src/java/org/apache/hadoop/hive/ql/ddl/process/abort/compaction/AbortCompactionsOperation.java:
##########
@@ -0,0 +1,81 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hive.ql.ddl.process.abort.compaction;
+
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.hive.metastore.api.AbortCompactResponse;
+import org.apache.hadoop.hive.metastore.api.AbortCompactionRequest;
+import org.apache.hadoop.hive.metastore.api.AbortCompactionResponseElement;
+import org.apache.hadoop.hive.ql.ddl.DDLOperation;
+import org.apache.hadoop.hive.ql.ddl.DDLOperationContext;
+import org.apache.hadoop.hive.ql.ddl.ShowUtils;
+import org.apache.hadoop.hive.ql.exec.Utilities;
+import org.apache.hadoop.hive.ql.metadata.HiveException;
+
+import java.io.DataOutputStream;
+import java.io.IOException;
+
+
+/**
+ * Operation process of aborting compactions.
+ */
+public class AbortCompactionsOperation extends 
DDLOperation<AbortCompactionsDesc> {
+    public AbortCompactionsOperation(DDLOperationContext context, 
AbortCompactionsDesc desc) {
+        super(context, desc);
+    }
+
+    @Override
+    public int execute() throws HiveException {
+        AbortCompactionRequest request = new AbortCompactionRequest();
+        request.setCompactionIds(desc.getCompactionIds());
+        AbortCompactResponse response = 
context.getDb().abortCompactions(request);
+        try (DataOutputStream os = ShowUtils.getOutputStream(new 
Path(desc.getResFile()), context)) {
+            writeHeader(os);
+            if (response.getAbortedcompacts() != null) {
+                for (AbortCompactionResponseElement e : 
response.getAbortedcompacts()) {
+                    writeRow(os, e);
+                }
+            }
+        } catch (Exception e) {
+            LOG.warn("show compactions: ", e);

Review Comment:
   Should be "abort compactions"



##########
standalone-metastore/metastore-common/src/main/thrift/hive_metastore.thrift:
##########
@@ -1393,6 +1393,22 @@ struct ShowCompactResponse {
     1: required list<ShowCompactResponseElement> compacts,
 }
 
+struct AbortCompactionRequest {
+    1: required list<i64> compactionIds,
+    2: optional string type,
+    3: optional string poolName
+}
+
+struct AbortCompactionResponseElement {
+    1: required i64 compactionIds,

Review Comment:
   Should be compactionId



##########
standalone-metastore/metastore-server/src/main/java/org/apache/hadoop/hive/metastore/txn/TxnHandler.java:
##########
@@ -6242,4 +6246,86 @@ public boolean isWrapperFor(Class<?> iface) throws 
SQLException {
     }
   }
 
+  @Override
+  @RetrySemantics.SafeToRetry
+  public AbortCompactResponse abortCompactions(AbortCompactionRequest reqst) 
throws MetaException, NoSuchCompactionException {
+    AbortCompactResponse response = new AbortCompactResponse(new 
ArrayList<>());
+    List<Long> requestedCompId = reqst.getCompactionIds();
+    if (requestedCompId.isEmpty()) {
+      LOG.info("Compaction ids missing in request. No compactions to abort");
+      throw new NoSuchCompactionException("ompaction ids missing in request. 
No compactions to abort");
+    }
+    List<AbortCompactionResponseElement> abortCompactionResponseElementList = 
new ArrayList<>();
+    for (int i = 0; i < requestedCompId.size(); i++) {
+      AbortCompactionResponseElement responseEle = 
abortCompaction(requestedCompId.get(i));
+      abortCompactionResponseElementList.add(responseEle);
+    }
+    response.setAbortedcompacts(abortCompactionResponseElementList);
+    return response;
+  }
+
+  @RetrySemantics.SafeToRetry
+  public AbortCompactionResponseElement abortCompaction(Long compId) throws 
MetaException {
+    try {
+      AbortCompactionResponseElement responseEle = new 
AbortCompactionResponseElement();
+      responseEle.setCompactionIds(compId);
+      try (Connection dbConn = 
getDbConn(Connection.TRANSACTION_READ_COMMITTED, connPoolMutex)) {
+        Optional<CompactionInfo> compactionInfo = 
getCompactionByCompId(dbConn, compId);
+        if (compactionInfo.isPresent()) {
+          try (PreparedStatement pStmt = 
dbConn.prepareStatement(TxnQueries.INSERT_INTO_COMPLETED_COMPACTION)) {
+            CompactionInfo ci = compactionInfo.get();
+            ci.errorMessage = "Compaction aborted by user";
+            ci.state = TxnStore.ABORTED_STATE;
+            CompactionInfo.insertIntoCompletedCompactions(pStmt, ci, 
getDbTime(dbConn));
+            int updCount = pStmt.executeUpdate();
+            if (updCount != 1) {
+              LOG.error("Unable to update compaction record: {}. updCnt={}", 
ci, updCount);

Review Comment:
   At this point the execution should be moved to the next item. After rolling 
back the insert, the code still tries to run the delete command, which makes no 
sense. Also, the response element should be set with an appropriate status and 
error, before advancing to the next element.



##########
standalone-metastore/metastore-server/src/main/java/org/apache/hadoop/hive/metastore/txn/TxnHandler.java:
##########
@@ -6242,4 +6246,86 @@ public boolean isWrapperFor(Class<?> iface) throws 
SQLException {
     }
   }
 
+  @Override
+  @RetrySemantics.SafeToRetry
+  public AbortCompactResponse abortCompactions(AbortCompactionRequest reqst) 
throws MetaException, NoSuchCompactionException {
+    AbortCompactResponse response = new AbortCompactResponse(new 
ArrayList<>());
+    List<Long> requestedCompId = reqst.getCompactionIds();
+    if (requestedCompId.isEmpty()) {
+      LOG.info("Compaction ids missing in request. No compactions to abort");
+      throw new NoSuchCompactionException("ompaction ids missing in request. 
No compactions to abort");

Review Comment:
   Please fix: Compaction ids are missing



##########
standalone-metastore/metastore-common/src/main/java/org/apache/hadoop/hive/metastore/txn/TxnQueries.java:
##########
@@ -50,4 +50,20 @@ public class TxnQueries {
     "  \"CC_HIGHEST_WRITE_ID\"" +
     "FROM " +
     "  \"COMPLETED_COMPACTIONS\" ) XX ";
+
+
+  public static final String SELECT_COMPACTION_QUEUE_BY_COMPID = "SELECT 
\"CQ_ID\", \"CQ_DATABASE\", \"CQ_TABLE\", \"CQ_PARTITION\", "
+    + "\"CQ_STATE\", \"CQ_TYPE\", \"CQ_TBLPROPERTIES\", \"CQ_WORKER_ID\", 
\"CQ_START\", \"CQ_RUN_AS\", "
+    + "\"CQ_HIGHEST_WRITE_ID\", \"CQ_META_INFO\", \"CQ_HADOOP_JOB_ID\", 
\"CQ_ERROR_MESSAGE\", "
+    + "\"CQ_ENQUEUE_TIME\", \"CQ_WORKER_VERSION\", \"CQ_INITIATOR_ID\", 
\"CQ_INITIATOR_VERSION\", "
+    + "\"CQ_RETRY_RETENTION\", \"CQ_NEXT_TXN_ID\", \"CQ_TXN_ID\", 
\"CQ_COMMIT_TIME\", \"CQ_POOL_NAME\" "
+    + "FROM \"COMPACTION_QUEUE\" WHERE \"CQ_ID\" = ? AND \"CQ_STATE\" ='i'";
+
+  public static final String INSERT_INTO_COMPLETED_COMPACTION = "INSERT INTO 
\"COMPLETED_COMPACTIONS\" "

Review Comment:
   There is another similar query in 
org.apache.hadoop.hive.metastore.txn.CompactionTxnHandler.updateStatus, please 
use this query there as well to avoid code duplication.



##########
standalone-metastore/metastore-server/src/main/java/org/apache/hadoop/hive/metastore/txn/TxnHandler.java:
##########
@@ -6242,4 +6246,86 @@ public boolean isWrapperFor(Class<?> iface) throws 
SQLException {
     }
   }
 
+  @Override
+  @RetrySemantics.SafeToRetry
+  public AbortCompactResponse abortCompactions(AbortCompactionRequest reqst) 
throws MetaException, NoSuchCompactionException {
+    AbortCompactResponse response = new AbortCompactResponse(new 
ArrayList<>());
+    List<Long> requestedCompId = reqst.getCompactionIds();
+    if (requestedCompId.isEmpty()) {
+      LOG.info("Compaction ids missing in request. No compactions to abort");
+      throw new NoSuchCompactionException("ompaction ids missing in request. 
No compactions to abort");
+    }
+    List<AbortCompactionResponseElement> abortCompactionResponseElementList = 
new ArrayList<>();
+    for (int i = 0; i < requestedCompId.size(); i++) {
+      AbortCompactionResponseElement responseEle = 
abortCompaction(requestedCompId.get(i));
+      abortCompactionResponseElementList.add(responseEle);
+    }
+    response.setAbortedcompacts(abortCompactionResponseElementList);
+    return response;
+  }
+
+  @RetrySemantics.SafeToRetry
+  public AbortCompactionResponseElement abortCompaction(Long compId) throws 
MetaException {
+    try {
+      AbortCompactionResponseElement responseEle = new 
AbortCompactionResponseElement();
+      responseEle.setCompactionIds(compId);
+      try (Connection dbConn = 
getDbConn(Connection.TRANSACTION_READ_COMMITTED, connPoolMutex)) {
+        Optional<CompactionInfo> compactionInfo = 
getCompactionByCompId(dbConn, compId);
+        if (compactionInfo.isPresent()) {
+          try (PreparedStatement pStmt = 
dbConn.prepareStatement(TxnQueries.INSERT_INTO_COMPLETED_COMPACTION)) {
+            CompactionInfo ci = compactionInfo.get();
+            ci.errorMessage = "Compaction aborted by user";
+            ci.state = TxnStore.ABORTED_STATE;
+            CompactionInfo.insertIntoCompletedCompactions(pStmt, ci, 
getDbTime(dbConn));
+            int updCount = pStmt.executeUpdate();
+            if (updCount != 1) {
+              LOG.error("Unable to update compaction record: {}. updCnt={}", 
ci, updCount);
+              dbConn.rollback();
+            }
+            LOG.debug("Inserted {} entries into COMPLETED_COMPACTIONS", 
updCount);
+            try (PreparedStatement stmt = dbConn.prepareStatement("DELETE FROM 
\"COMPACTION_QUEUE\" WHERE \"CQ_ID\" = ?")) {
+              stmt.setLong(1, ci.id);
+              LOG.debug("Going to execute update on COMPACTION_QUEUE <{}>");
+              updCount = stmt.executeUpdate();
+              if (updCount != 1) {
+                LOG.error("Unable to update compaction record: {}. updCnt={}", 
ci, updCount);
+                dbConn.rollback();
+              } else {
+                responseEle.setMessage("Successfully Aborted Compaction ");
+                responseEle.setStatus("Success");
+                dbConn.commit();
+              }
+            }
+          }
+        } else {
+          responseEle.setMessage("Compaction element not eligible for 
cancellation");
+          responseEle.setStatus("Error");
+        }
+      } catch (SQLException e) {
+        LOG.error("Failed to abort compaction request");
+        checkRetryable(e, "abortCompaction(" + compId + ")");
+        responseEle.setMessage("Error while aborting compaction");
+        responseEle.setStatus("Error");
+      }

Review Comment:
   I think this method should also catch Exception in a separate block and 
return an appropriate list of response elements in all the cases. For example 
if two compactions successfully cancelled, but the third one results in a 
runtime exception, this method won't return anything, but throw an exception. 
As a result, the user will see the error of the third compaction only, without 
any information about the other two compactions (which were cancelled 
successfully).



##########
ql/src/java/org/apache/hadoop/hive/ql/ddl/process/abort/compaction/AbortCompactionsOperation.java:
##########
@@ -0,0 +1,81 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hive.ql.ddl.process.abort.compaction;
+
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.hive.metastore.api.AbortCompactResponse;
+import org.apache.hadoop.hive.metastore.api.AbortCompactionRequest;
+import org.apache.hadoop.hive.metastore.api.AbortCompactionResponseElement;
+import org.apache.hadoop.hive.ql.ddl.DDLOperation;
+import org.apache.hadoop.hive.ql.ddl.DDLOperationContext;
+import org.apache.hadoop.hive.ql.ddl.ShowUtils;
+import org.apache.hadoop.hive.ql.exec.Utilities;
+import org.apache.hadoop.hive.ql.metadata.HiveException;
+
+import java.io.DataOutputStream;
+import java.io.IOException;
+
+
+/**
+ * Operation process of aborting compactions.
+ */
+public class AbortCompactionsOperation extends 
DDLOperation<AbortCompactionsDesc> {
+    public AbortCompactionsOperation(DDLOperationContext context, 
AbortCompactionsDesc desc) {
+        super(context, desc);
+    }
+
+    @Override
+    public int execute() throws HiveException {
+        AbortCompactionRequest request = new AbortCompactionRequest();
+        request.setCompactionIds(desc.getCompactionIds());
+        AbortCompactResponse response = 
context.getDb().abortCompactions(request);
+        try (DataOutputStream os = ShowUtils.getOutputStream(new 
Path(desc.getResFile()), context)) {
+            writeHeader(os);
+            if (response.getAbortedcompacts() != null) {
+                for (AbortCompactionResponseElement e : 
response.getAbortedcompacts()) {
+                    writeRow(os, e);
+                }
+            }
+        } catch (Exception e) {
+            LOG.warn("show compactions: ", e);
+            return 1;
+        }
+        return 0;
+    }
+
+    private void writeHeader(DataOutputStream os) throws IOException {
+        os.writeBytes("CompactionId");
+        os.write(Utilities.tabCode);
+        os.writeBytes("Status");
+        os.write(Utilities.tabCode);
+        os.writeBytes("Message");
+        os.write(Utilities.newLineCode);
+
+    }
+
+    private void writeRow(DataOutputStream os, AbortCompactionResponseElement 
e) throws IOException {
+        os.writeBytes(Long.toString(e.getCompactionIds()));
+        os.write(Utilities.tabCode);
+        os.writeBytes(e.getStatus());
+        os.write(Utilities.tabCode);
+        os.writeBytes(e.getMessage());
+        os.write(Utilities.tabCode);

Review Comment:
   Is this tab before newline required?



##########
standalone-metastore/metastore-server/src/main/java/org/apache/hadoop/hive/metastore/txn/TxnHandler.java:
##########
@@ -6242,4 +6246,86 @@ public boolean isWrapperFor(Class<?> iface) throws 
SQLException {
     }
   }
 
+  @Override
+  @RetrySemantics.SafeToRetry
+  public AbortCompactResponse abortCompactions(AbortCompactionRequest reqst) 
throws MetaException, NoSuchCompactionException {
+    AbortCompactResponse response = new AbortCompactResponse(new 
ArrayList<>());
+    List<Long> requestedCompId = reqst.getCompactionIds();
+    if (requestedCompId.isEmpty()) {
+      LOG.info("Compaction ids missing in request. No compactions to abort");
+      throw new NoSuchCompactionException("ompaction ids missing in request. 
No compactions to abort");
+    }
+    List<AbortCompactionResponseElement> abortCompactionResponseElementList = 
new ArrayList<>();
+    for (int i = 0; i < requestedCompId.size(); i++) {
+      AbortCompactionResponseElement responseEle = 
abortCompaction(requestedCompId.get(i));
+      abortCompactionResponseElementList.add(responseEle);
+    }
+    response.setAbortedcompacts(abortCompactionResponseElementList);
+    return response;
+  }
+
+  @RetrySemantics.SafeToRetry
+  public AbortCompactionResponseElement abortCompaction(Long compId) throws 
MetaException {
+    try {
+      AbortCompactionResponseElement responseEle = new 
AbortCompactionResponseElement();
+      responseEle.setCompactionIds(compId);
+      try (Connection dbConn = 
getDbConn(Connection.TRANSACTION_READ_COMMITTED, connPoolMutex)) {
+        Optional<CompactionInfo> compactionInfo = 
getCompactionByCompId(dbConn, compId);
+        if (compactionInfo.isPresent()) {
+          try (PreparedStatement pStmt = 
dbConn.prepareStatement(TxnQueries.INSERT_INTO_COMPLETED_COMPACTION)) {
+            CompactionInfo ci = compactionInfo.get();
+            ci.errorMessage = "Compaction aborted by user";
+            ci.state = TxnStore.ABORTED_STATE;
+            CompactionInfo.insertIntoCompletedCompactions(pStmt, ci, 
getDbTime(dbConn));
+            int updCount = pStmt.executeUpdate();
+            if (updCount != 1) {
+              LOG.error("Unable to update compaction record: {}. updCnt={}", 
ci, updCount);
+              dbConn.rollback();
+            }
+            LOG.debug("Inserted {} entries into COMPLETED_COMPACTIONS", 
updCount);
+            try (PreparedStatement stmt = dbConn.prepareStatement("DELETE FROM 
\"COMPACTION_QUEUE\" WHERE \"CQ_ID\" = ?")) {
+              stmt.setLong(1, ci.id);
+              LOG.debug("Going to execute update on COMPACTION_QUEUE <{}>");
+              updCount = stmt.executeUpdate();
+              if (updCount != 1) {
+                LOG.error("Unable to update compaction record: {}. updCnt={}", 
ci, updCount);
+                dbConn.rollback();
+              } else {
+                responseEle.setMessage("Successfully Aborted Compaction ");
+                responseEle.setStatus("Success");
+                dbConn.commit();
+              }
+            }
+          }
+        } else {
+          responseEle.setMessage("Compaction element not eligible for 
cancellation");
+          responseEle.setStatus("Error");
+        }
+      } catch (SQLException e) {
+        LOG.error("Failed to abort compaction request");
+        checkRetryable(e, "abortCompaction(" + compId + ")");
+        responseEle.setMessage("Error while aborting compaction");
+        responseEle.setStatus("Error");
+      }
+      return responseEle;
+    } catch (RetryException e) {
+      return abortCompaction(compId);
+    }
+
+  }
+
+  private Optional<CompactionInfo> getCompactionByCompId(Connection dbConn, 
Long compId) throws SQLException, MetaException {

Review Comment:
   This method should take the list of ids and return them in one round. It 
should join on completed compactions and should not filter on status, so the 
error messages can be more meaningful. For example to differentiate if there is 
no compaction with the given id, or it is already cancelled/completed, etc.



##########
standalone-metastore/metastore-server/src/main/java/org/apache/hadoop/hive/metastore/txn/TxnHandler.java:
##########
@@ -6242,4 +6246,86 @@ public boolean isWrapperFor(Class<?> iface) throws 
SQLException {
     }
   }
 
+  @Override
+  @RetrySemantics.SafeToRetry
+  public AbortCompactResponse abortCompactions(AbortCompactionRequest reqst) 
throws MetaException, NoSuchCompactionException {

Review Comment:
   I think this method should be in CompactionTxnHandler instead.



##########
ql/src/java/org/apache/hadoop/hive/ql/ddl/process/abort/compaction/AbortCompactionsDesc.java:
##########
@@ -0,0 +1,49 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hive.ql.ddl.process.abort.compaction;
+
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.hive.ql.ddl.DDLDesc;
+import org.apache.hadoop.hive.ql.plan.Explain;
+
+import java.io.Serializable;
+import java.util.List;
+/**
+ * DDL task description for ABORT COMPACTIONS commands.
+ */
+@Explain(displayName = "Abort Compaction", explainLevels = { 
Explain.Level.USER, Explain.Level.DEFAULT, Explain.Level.EXTENDED })
+public class AbortCompactionsDesc implements DDLDesc, Serializable {
+    private static final long serialVersionUID = 1L;
+
+    private final List<Long> compactionIds;
+    private String resFile;

Review Comment:
   should be final





Issue Time Tracking
-------------------

    Worklog Id:     (was: 836397)
    Time Spent: 50m  (was: 40m)

> Cancel Compactions in initiated state
> -------------------------------------
>
>                 Key: HIVE-26804
>                 URL: https://issues.apache.org/jira/browse/HIVE-26804
>             Project: Hive
>          Issue Type: New Feature
>          Components: Hive
>            Reporter: KIRTI RUGE
>            Assignee: KIRTI RUGE
>            Priority: Major
>              Labels: pull-request-available
>          Time Spent: 50m
>  Remaining Estimate: 0h
>




--
This message was sent by Atlassian Jira
(v8.20.10#820010)

Reply via email to