[ https://issues.apache.org/jira/browse/HIVE-25898?focusedWorklogId=721184&page=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-721184 ]
ASF GitHub Bot logged work on HIVE-25898: ----------------------------------------- Author: ASF GitHub Bot Created on: 04/Feb/22 21:31 Start Date: 04/Feb/22 21:31 Worklog Time Spent: 10m Work Description: zabetak commented on a change in pull request #2981: URL: https://github.com/apache/hive/pull/2981#discussion_r799803119 ########## File path: ql/src/java/org/apache/hadoop/hive/ql/txn/compactor/Worker.java ########## @@ -685,11 +672,16 @@ void open(CompactionInfo ci) throws TException { + "}, status {" + res.getState() + "}, reason {" + res.getErrorMessage() + "}"); } lockId = res.getLockid(); - - heartbeatExecutor = Executors.newSingleThreadScheduledExecutor(); + heartbeatExecutor = Executors.newSingleThreadScheduledExecutor( + CompactorUtil.createThreadFactory( + "CompactionTxn Heartbeater - " + txnId, Thread.MIN_PRIORITY, true)); Review comment: nit: I would keep the thread name unchanged just to avoid potentially breaking people scripts when analyzing stack traces etc. ########## File path: ql/src/java/org/apache/hadoop/hive/ql/txn/compactor/Worker.java ########## @@ -685,11 +672,16 @@ void open(CompactionInfo ci) throws TException { + "}, status {" + res.getState() + "}, reason {" + res.getErrorMessage() + "}"); } lockId = res.getLockid(); - - heartbeatExecutor = Executors.newSingleThreadScheduledExecutor(); + heartbeatExecutor = Executors.newSingleThreadScheduledExecutor( + CompactorUtil.createThreadFactory( + "CompactionTxn Heartbeater - " + txnId, Thread.MIN_PRIORITY, true)); Review comment: nit: You could even avoid introducing a new utility method (which is used in only one place) and dependency to Guava via: ``` heartbeatExecutor = Executors.newSingleThreadScheduledExecutor((runnable) -> { Thread t = new Thread(runnable); t.setDaemon(true); t.setName("CompactionHeartbeater-" + txnId); t.setPriority(Thread.MIN_PRIORITY); return t; }); ``` ########## File path: ql/src/java/org/apache/hadoop/hive/ql/txn/compactor/Worker.java ########## @@ -685,11 +672,16 @@ void open(CompactionInfo ci) throws TException { + "}, status {" + res.getState() + "}, reason {" + res.getErrorMessage() + "}"); } lockId = res.getLockid(); - - heartbeatExecutor = Executors.newSingleThreadScheduledExecutor(); + heartbeatExecutor = Executors.newSingleThreadScheduledExecutor( + CompactorUtil.createThreadFactory( + "CompactionTxn Heartbeater - " + txnId, Thread.MIN_PRIORITY, true)); Review comment: No strong feelings up to you :) ########## File path: itests/hive-unit/src/test/java/org/apache/hadoop/hive/ql/txn/compactor/TestCompactor.java ########## @@ -170,6 +173,67 @@ public void tearDown() { } } + + @Test + public void testHeartbeatShutdownOnFailedCompaction() throws Exception { + String dbName = "default"; + String tblName = "compaction_test"; + executeStatementOnDriver("drop table if exists " + tblName, driver); + executeStatementOnDriver("CREATE TABLE " + tblName + "(a INT, b STRING) " + + " PARTITIONED BY(bkt INT)" + + " CLUSTERED BY(a) INTO 4 BUCKETS" + //currently ACID requires table to be bucketed + " STORED AS ORC TBLPROPERTIES ('transactional'='true')", driver); + + StrictDelimitedInputWriter writer = StrictDelimitedInputWriter.newBuilder() + .withFieldDelimiter(',') + .build(); + HiveStreamingConnection connection = HiveStreamingConnection.newBuilder() + .withDatabase(dbName) + .withTable(tblName) + .withStaticPartitionValues(Arrays.asList("0")) + .withAgentInfo("UT_" + Thread.currentThread().getName()) + .withHiveConf(conf) + .withRecordWriter(writer) + .connect(); + connection.beginTransaction(); + connection.write("55, 'London'".getBytes()); + connection.commitTransaction(); + connection.beginTransaction(); + connection.write("56, 'Paris'".getBytes()); + connection.commitTransaction(); + connection.close(); + + executeStatementOnDriver("INSERT INTO TABLE " + tblName + " PARTITION(bkt=1)" + + " values(57, 'Budapest')", driver); + executeStatementOnDriver("INSERT INTO TABLE " + tblName + " PARTITION(bkt=1)" + + " values(58, 'Milano')", driver); + execSelectAndDumpData("select * from " + tblName, driver, "Dumping data for " + + tblName + " after load:"); + + TxnStore txnHandler = TxnUtils.getTxnStore(conf); + + // Commit will throw an exception + IMetaStoreClient mockedClient = Mockito.spy(new HiveMetaStoreClient(conf)); + doThrow(new RuntimeException("Simulating RuntimeException from CompactionTxn.commit")).when(mockedClient).commitTxn(Mockito.anyLong()); + + //Do a major compaction + CompactionRequest rqst = new CompactionRequest(dbName, tblName, CompactionType.MAJOR); + rqst.setPartitionname("bkt=0"); + txnHandler.compact(rqst); + + Worker worker = Mockito.spy(new Worker()); + worker.setThreadId((int) worker.getId()); + worker.setConf(conf); + worker.init(new AtomicBoolean(true)); + FieldSetter.setField(worker, RemoteCompactorThread.class.getDeclaredField("msc"), mockedClient); + + worker.run(); + + //Check if the heartbeating is properly terminated + Assert.assertTrue(Thread.getAllStackTraces().keySet() + .stream().noneMatch(k -> k.getName().contains("CompactionTxn Heartbeater"))); Review comment: Maybe an assertion above that it really runs at some point could be useful. ########## File path: ql/src/java/org/apache/hadoop/hive/ql/txn/compactor/Worker.java ########## @@ -657,24 +649,19 @@ private String getWorkerId() { /** * Keep track of the compaction's transaction and its operations. */ - private class CompactionTxn implements AutoCloseable { + class CompactionTxn implements AutoCloseable { Review comment: Do we need to change visibility? ########## File path: ql/src/java/org/apache/hadoop/hive/ql/txn/compactor/Worker.java ########## @@ -749,44 +740,39 @@ long getLockId() { return lockId; } + boolean isHeartbeatTerminated() { + return heartbeatExecutor == null || heartbeatExecutor.isTerminated(); + } + @Override public String toString() { return "txnId=" + txnId + ", lockId=" + lockId + " (TxnStatus: " + status + ")"; } /** * Commit the txn if open. */ - private void commit() { - if (msc == null) { - LOG.error("Metastore client was null. Could not commit txn " + this); - return; - } + private void commit() throws TException { if (status == TxnStatus.OPEN) { - try { - msc.commitTxn(txnId); - status = TxnStatus.COMMITTED; - } catch (TException e) { - LOG.error("Caught an exception while committing compaction txn in worker " + workerName, e); - } + msc.commitTxn(txnId); Review comment: @veghlaci05 have you seen my comment above? ########## File path: ql/src/java/org/apache/hadoop/hive/ql/txn/compactor/Worker.java ########## @@ -715,28 +705,26 @@ void wasSuccessful() { * @throws Exception */ @Override public void close() throws Exception { + //the transaction is about to close, we can stop heartbeating regardless of it's state + shutdownHeartbeater(); if (status != TxnStatus.UNKNOWN) { - // turn off error logging in heartbeater in case of race condition between commit/abort and heartbeating - heartbeater.shouldLogError(false); if (succeessfulCompaction) { commit(); } else { abort(); } } - shutdownHeartbeater(); } private void shutdownHeartbeater() { if (heartbeatExecutor != null) { heartbeatExecutor.shutdownNow(); try { if (!heartbeatExecutor.awaitTermination(5, TimeUnit.SECONDS)) { - heartbeatExecutor.shutdownNow(); + LOG.debug("Heartbeating for transaction {} did not stop in 5 seconds, do not wait any longer.", this); } - LOG.debug("Successfully stopped heartbeating for transaction {}", this); } catch (InterruptedException ex) { - heartbeatExecutor.shutdownNow(); + //Caller thread was interrupted while waiting for heartbeater to terminate, nothing to do Review comment: If there is an interrupt during `heartbeatExecutor#awaitTermination` we will never see another log (nor stacktrace) that there was an attempt to shutdown the heartbeater. Moreover, I don't feel very comfortable about swallowing completely an interrupted exception. The least that I would expect here is: `Thread.currentThread().interrupt();` but if you have thoroughly though about it and it is not necessary then I trust your judgement. TLDR Citing the "Java Concurrency in Practice": Propagate the InterruptedException . This is often the most sensible policy if you can get away with it just propagate the InterruptedException to your caller. This could involve not catching InterruptedException , or catching it and throwing it again after performing some brief activity specific cleanup. Restore the interrupt. Sometimes you cannot throw InterruptedException , for instance when your code is part of a Runnable . In these situations, you must catch InterruptedException and restore the interrupted status by calling interrupt on the current thread, so that code higher up the call stack can see that an interrupt was issued, as demonstrated in Listing 5.10. You can get much more sophisticated with interruption, but these two approaches should work in the vast majority of situations. But there is one thing you should not do with InterruptedException catch it and do nothing in response. This deprives code higher up on the call stack of the opportunity to act on the interruption, because the evidence that the thread was interrupted is lost. The only situation in which it is acceptable to swallow an interrupt is when you are extending Thread and therefore control all the code higher up on the call stack. Cancellation and interruption are covered in greater detail in Chapter 7. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: gitbox-unsubscr...@hive.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org Issue Time Tracking ------------------- Worklog Id: (was: 721184) Time Spent: 2h 40m (was: 2.5h) > Compaction txn heartbeating after Worker timeout > ------------------------------------------------ > > Key: HIVE-25898 > URL: https://issues.apache.org/jira/browse/HIVE-25898 > Project: Hive > Issue Type: Bug > Components: Hive > Reporter: László Végh > Assignee: László Végh > Priority: Major > Labels: pull-request-available > Time Spent: 2h 40m > Remaining Estimate: 0h > > In some cases, when the compaction transaction is aborted, the hearbeater > thread is not shut down and keeps heartbeating. -- This message was sent by Atlassian Jira (v8.20.1#820001)