This is an automated email from the ASF dual-hosted git repository.

klcopp pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/hive.git


The following commit(s) were added to refs/heads/master by this push:
     new 40bde37  HIVE-24096: Abort failed compaction's txn on TException or 
IOException (Karen Coppage, reviewed by Peter Vary)
40bde37 is described below

commit 40bde37b161838e9c52f4768400c9b937bd148f4
Author: Karen Coppage <karenlcopp...@gmail.com>
AuthorDate: Thu Sep 3 09:35:38 2020 +0200

    HIVE-24096: Abort failed compaction's txn on TException or IOException 
(Karen Coppage, reviewed by Peter Vary)
    
    Closes #1447.
---
 .../hadoop/hive/ql/txn/compactor/Worker.java       | 35 ++++++++++++----------
 1 file changed, 20 insertions(+), 15 deletions(-)

diff --git a/ql/src/java/org/apache/hadoop/hive/ql/txn/compactor/Worker.java 
b/ql/src/java/org/apache/hadoop/hive/ql/txn/compactor/Worker.java
index b705c96f..605821b 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/txn/compactor/Worker.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/txn/compactor/Worker.java
@@ -72,7 +72,7 @@ public class Worker extends RemoteCompactorThread implements 
MetaStoreThread {
   static final private String CLASS_NAME = Worker.class.getName();
   static final private Logger LOG = LoggerFactory.getLogger(CLASS_NAME);
   static final private long SLEEP_TIME = 10000;
-  private static final int NOT_SET = -1;
+  private static final int TXN_ID_NOT_SET = -1;
 
   private String workerName;
 
@@ -142,7 +142,7 @@ public class Worker extends RemoteCompactorThread 
implements MetaStoreThread {
   }
 
   private void commitTxnIfSet(long compactorTxnId) {
-    if (compactorTxnId != NOT_SET) {
+    if (compactorTxnId != TXN_ID_NOT_SET) {
       try {
         if (msc != null) {
           msc.commitTxn(compactorTxnId);
@@ -392,7 +392,7 @@ public class Worker extends RemoteCompactorThread 
implements MetaStoreThread {
     // so wrap it in a big catch Throwable statement.
     CompactionHeartbeater heartbeater = null;
     CompactionInfo ci = null;
-    long compactorTxnId = NOT_SET;
+    long compactorTxnId = TXN_ID_NOT_SET;
     try {
       if (msc == null) {
         try {
@@ -545,22 +545,16 @@ public class Worker extends RemoteCompactorThread 
implements MetaStoreThread {
         msc.markCompacted(CompactionInfo.compactionInfoToStruct(ci));
       } catch (Throwable e) {
         LOG.error("Caught exception while trying to compact " + ci +
-                      ".  Marking failed to avoid repeated failures", e);
-        ci.errorMessage = e.getMessage();
-        msc.markFailed(CompactionInfo.compactionInfoToStruct(ci));
-        msc.abortTxns(Collections.singletonList(compactorTxnId));
-        compactorTxnId = NOT_SET;
+            ".  Marking failed to avoid repeated failures", e);
+        abortCompactionAndMarkFailed(ci, compactorTxnId, e);
       }
     } catch (TException | IOException t) {
       LOG.error("Caught an exception in the main loop of compactor worker " + 
workerName, t);
       try {
-        if (msc != null && ci != null) {
-          ci.errorMessage = t.getMessage();
-          msc.markFailed(CompactionInfo.compactionInfoToStruct(ci));
-          compactorTxnId = NOT_SET;
-        }
+        abortCompactionAndMarkFailed(ci, compactorTxnId, t);
       } catch (TException e) {
-        LOG.error("Caught an exception while trying to mark compaction {} as 
failed: {}", ci, e);
+        LOG.error("Caught an exception while trying to mark compaction {} as 
failed: {}" +
+                (compactorTxnId != TXN_ID_NOT_SET ? " or abort txnId " + 
compactorTxnId : "") , ci, e);
       } finally {
         if (msc != null) {
           msc.close();
@@ -569,7 +563,6 @@ public class Worker extends RemoteCompactorThread 
implements MetaStoreThread {
       }
     } catch (Throwable t) {
       LOG.error("Caught an exception in the main loop of compactor worker " + 
workerName, t);
-      compactorTxnId = NOT_SET;
     } finally {
       commitTxnIfSet(compactorTxnId);
       if (heartbeater != null) {
@@ -579,6 +572,18 @@ public class Worker extends RemoteCompactorThread 
implements MetaStoreThread {
     return true;
   }
 
+  private void abortCompactionAndMarkFailed(CompactionInfo ci, long 
compactorTxnId, Throwable e) throws TException {
+    if (ci != null) {
+      ci.errorMessage = e.getMessage();
+    }
+    if (msc != null) {
+      msc.markFailed(CompactionInfo.compactionInfoToStruct(ci));
+      if (compactorTxnId != TXN_ID_NOT_SET) {
+        msc.abortTxns(Collections.singletonList(compactorTxnId));
+      }
+    }
+  }
+
   private void checkInterrupt() throws InterruptedException {
     if (Thread.interrupted()) {
       throw new InterruptedException("Compaction execution is interrupted");

Reply via email to