This is an automated email from the ASF dual-hosted git repository. klcopp pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/hive.git
The following commit(s) were added to refs/heads/master by this push: new 40bde37 HIVE-24096: Abort failed compaction's txn on TException or IOException (Karen Coppage, reviewed by Peter Vary) 40bde37 is described below commit 40bde37b161838e9c52f4768400c9b937bd148f4 Author: Karen Coppage <karenlcopp...@gmail.com> AuthorDate: Thu Sep 3 09:35:38 2020 +0200 HIVE-24096: Abort failed compaction's txn on TException or IOException (Karen Coppage, reviewed by Peter Vary) Closes #1447. --- .../hadoop/hive/ql/txn/compactor/Worker.java | 35 ++++++++++++---------- 1 file changed, 20 insertions(+), 15 deletions(-) diff --git a/ql/src/java/org/apache/hadoop/hive/ql/txn/compactor/Worker.java b/ql/src/java/org/apache/hadoop/hive/ql/txn/compactor/Worker.java index b705c96f..605821b 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/txn/compactor/Worker.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/txn/compactor/Worker.java @@ -72,7 +72,7 @@ public class Worker extends RemoteCompactorThread implements MetaStoreThread { static final private String CLASS_NAME = Worker.class.getName(); static final private Logger LOG = LoggerFactory.getLogger(CLASS_NAME); static final private long SLEEP_TIME = 10000; - private static final int NOT_SET = -1; + private static final int TXN_ID_NOT_SET = -1; private String workerName; @@ -142,7 +142,7 @@ public class Worker extends RemoteCompactorThread implements MetaStoreThread { } private void commitTxnIfSet(long compactorTxnId) { - if (compactorTxnId != NOT_SET) { + if (compactorTxnId != TXN_ID_NOT_SET) { try { if (msc != null) { msc.commitTxn(compactorTxnId); @@ -392,7 +392,7 @@ public class Worker extends RemoteCompactorThread implements MetaStoreThread { // so wrap it in a big catch Throwable statement. CompactionHeartbeater heartbeater = null; CompactionInfo ci = null; - long compactorTxnId = NOT_SET; + long compactorTxnId = TXN_ID_NOT_SET; try { if (msc == null) { try { @@ -545,22 +545,16 @@ public class Worker extends RemoteCompactorThread implements MetaStoreThread { msc.markCompacted(CompactionInfo.compactionInfoToStruct(ci)); } catch (Throwable e) { LOG.error("Caught exception while trying to compact " + ci + - ". Marking failed to avoid repeated failures", e); - ci.errorMessage = e.getMessage(); - msc.markFailed(CompactionInfo.compactionInfoToStruct(ci)); - msc.abortTxns(Collections.singletonList(compactorTxnId)); - compactorTxnId = NOT_SET; + ". Marking failed to avoid repeated failures", e); + abortCompactionAndMarkFailed(ci, compactorTxnId, e); } } catch (TException | IOException t) { LOG.error("Caught an exception in the main loop of compactor worker " + workerName, t); try { - if (msc != null && ci != null) { - ci.errorMessage = t.getMessage(); - msc.markFailed(CompactionInfo.compactionInfoToStruct(ci)); - compactorTxnId = NOT_SET; - } + abortCompactionAndMarkFailed(ci, compactorTxnId, t); } catch (TException e) { - LOG.error("Caught an exception while trying to mark compaction {} as failed: {}", ci, e); + LOG.error("Caught an exception while trying to mark compaction {} as failed: {}" + + (compactorTxnId != TXN_ID_NOT_SET ? " or abort txnId " + compactorTxnId : "") , ci, e); } finally { if (msc != null) { msc.close(); @@ -569,7 +563,6 @@ public class Worker extends RemoteCompactorThread implements MetaStoreThread { } } catch (Throwable t) { LOG.error("Caught an exception in the main loop of compactor worker " + workerName, t); - compactorTxnId = NOT_SET; } finally { commitTxnIfSet(compactorTxnId); if (heartbeater != null) { @@ -579,6 +572,18 @@ public class Worker extends RemoteCompactorThread implements MetaStoreThread { return true; } + private void abortCompactionAndMarkFailed(CompactionInfo ci, long compactorTxnId, Throwable e) throws TException { + if (ci != null) { + ci.errorMessage = e.getMessage(); + } + if (msc != null) { + msc.markFailed(CompactionInfo.compactionInfoToStruct(ci)); + if (compactorTxnId != TXN_ID_NOT_SET) { + msc.abortTxns(Collections.singletonList(compactorTxnId)); + } + } + } + private void checkInterrupt() throws InterruptedException { if (Thread.interrupted()) { throw new InterruptedException("Compaction execution is interrupted");