[ 
https://issues.apache.org/jira/browse/HIVE-25898?focusedWorklogId=721184&page=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-721184
 ]

ASF GitHub Bot logged work on HIVE-25898:
-----------------------------------------

                Author: ASF GitHub Bot
            Created on: 04/Feb/22 21:31
            Start Date: 04/Feb/22 21:31
    Worklog Time Spent: 10m 
      Work Description: zabetak commented on a change in pull request #2981:
URL: https://github.com/apache/hive/pull/2981#discussion_r799803119



##########
File path: ql/src/java/org/apache/hadoop/hive/ql/txn/compactor/Worker.java
##########
@@ -685,11 +672,16 @@ void open(CompactionInfo ci) throws TException {
             + "}, status {" + res.getState() + "}, reason {" + 
res.getErrorMessage() + "}");
       }
       lockId = res.getLockid();
-
-      heartbeatExecutor = Executors.newSingleThreadScheduledExecutor();
+      heartbeatExecutor = Executors.newSingleThreadScheduledExecutor(
+              CompactorUtil.createThreadFactory(
+                      "CompactionTxn Heartbeater - " + txnId, 
Thread.MIN_PRIORITY, true));

Review comment:
       nit: I would keep the thread name unchanged just to avoid potentially 
breaking people scripts when analyzing stack traces etc.

##########
File path: ql/src/java/org/apache/hadoop/hive/ql/txn/compactor/Worker.java
##########
@@ -685,11 +672,16 @@ void open(CompactionInfo ci) throws TException {
             + "}, status {" + res.getState() + "}, reason {" + 
res.getErrorMessage() + "}");
       }
       lockId = res.getLockid();
-
-      heartbeatExecutor = Executors.newSingleThreadScheduledExecutor();
+      heartbeatExecutor = Executors.newSingleThreadScheduledExecutor(
+              CompactorUtil.createThreadFactory(
+                      "CompactionTxn Heartbeater - " + txnId, 
Thread.MIN_PRIORITY, true));

Review comment:
       nit: You could even avoid introducing a new utility method (which is 
used in only one place) and dependency to Guava via:
   ```
   heartbeatExecutor = Executors.newSingleThreadScheduledExecutor((runnable) -> 
{
           Thread t = new Thread(runnable);
           t.setDaemon(true);
           t.setName("CompactionHeartbeater-" + txnId);
           t.setPriority(Thread.MIN_PRIORITY);
           return t;
         });
   ```

##########
File path: ql/src/java/org/apache/hadoop/hive/ql/txn/compactor/Worker.java
##########
@@ -685,11 +672,16 @@ void open(CompactionInfo ci) throws TException {
             + "}, status {" + res.getState() + "}, reason {" + 
res.getErrorMessage() + "}");
       }
       lockId = res.getLockid();
-
-      heartbeatExecutor = Executors.newSingleThreadScheduledExecutor();
+      heartbeatExecutor = Executors.newSingleThreadScheduledExecutor(
+              CompactorUtil.createThreadFactory(
+                      "CompactionTxn Heartbeater - " + txnId, 
Thread.MIN_PRIORITY, true));

Review comment:
       No strong feelings up to you :)

##########
File path: 
itests/hive-unit/src/test/java/org/apache/hadoop/hive/ql/txn/compactor/TestCompactor.java
##########
@@ -170,6 +173,67 @@ public void tearDown() {
     }
   }
 
+
+  @Test
+  public void testHeartbeatShutdownOnFailedCompaction() throws Exception {
+    String dbName = "default";
+    String tblName = "compaction_test";
+    executeStatementOnDriver("drop table if exists " + tblName, driver);
+    executeStatementOnDriver("CREATE TABLE " + tblName + "(a INT, b STRING) " +
+            " PARTITIONED BY(bkt INT)" +
+            " CLUSTERED BY(a) INTO 4 BUCKETS" + //currently ACID requires 
table to be bucketed
+            " STORED AS ORC  TBLPROPERTIES ('transactional'='true')", driver);
+
+    StrictDelimitedInputWriter writer = StrictDelimitedInputWriter.newBuilder()
+            .withFieldDelimiter(',')
+            .build();
+    HiveStreamingConnection connection = HiveStreamingConnection.newBuilder()
+            .withDatabase(dbName)
+            .withTable(tblName)
+            .withStaticPartitionValues(Arrays.asList("0"))
+            .withAgentInfo("UT_" + Thread.currentThread().getName())
+            .withHiveConf(conf)
+            .withRecordWriter(writer)
+            .connect();
+    connection.beginTransaction();
+    connection.write("55, 'London'".getBytes());
+    connection.commitTransaction();
+    connection.beginTransaction();
+    connection.write("56, 'Paris'".getBytes());
+    connection.commitTransaction();
+    connection.close();
+
+    executeStatementOnDriver("INSERT INTO TABLE " + tblName + " 
PARTITION(bkt=1)" +
+            " values(57, 'Budapest')", driver);
+    executeStatementOnDriver("INSERT INTO TABLE " + tblName + " 
PARTITION(bkt=1)" +
+            " values(58, 'Milano')", driver);
+    execSelectAndDumpData("select * from " + tblName, driver, "Dumping data 
for " +
+            tblName + " after load:");
+
+    TxnStore txnHandler = TxnUtils.getTxnStore(conf);
+
+    // Commit will throw an exception
+    IMetaStoreClient mockedClient = Mockito.spy(new HiveMetaStoreClient(conf));
+    doThrow(new RuntimeException("Simulating RuntimeException from 
CompactionTxn.commit")).when(mockedClient).commitTxn(Mockito.anyLong());
+
+    //Do a major compaction
+    CompactionRequest rqst = new CompactionRequest(dbName, tblName, 
CompactionType.MAJOR);
+    rqst.setPartitionname("bkt=0");
+    txnHandler.compact(rqst);
+
+    Worker worker = Mockito.spy(new Worker());
+    worker.setThreadId((int) worker.getId());
+    worker.setConf(conf);
+    worker.init(new AtomicBoolean(true));
+    FieldSetter.setField(worker, 
RemoteCompactorThread.class.getDeclaredField("msc"), mockedClient);
+
+    worker.run();
+
+    //Check if the heartbeating is properly terminated
+    Assert.assertTrue(Thread.getAllStackTraces().keySet()
+            .stream().noneMatch(k -> k.getName().contains("CompactionTxn 
Heartbeater")));

Review comment:
       Maybe an assertion above that it really runs at some point could be 
useful.

##########
File path: ql/src/java/org/apache/hadoop/hive/ql/txn/compactor/Worker.java
##########
@@ -657,24 +649,19 @@ private String getWorkerId() {
   /**
    * Keep track of the compaction's transaction and its operations.
    */
-  private class CompactionTxn implements AutoCloseable {
+  class CompactionTxn implements AutoCloseable {

Review comment:
       Do we need to change visibility?

##########
File path: ql/src/java/org/apache/hadoop/hive/ql/txn/compactor/Worker.java
##########
@@ -749,44 +740,39 @@ long getLockId() {
       return lockId;
     }
 
+    boolean isHeartbeatTerminated() {
+      return heartbeatExecutor == null || heartbeatExecutor.isTerminated();
+    }
+
     @Override public String toString() {
       return "txnId=" + txnId + ", lockId=" + lockId + " (TxnStatus: " + 
status + ")";
     }
 
     /**
      * Commit the txn if open.
      */
-    private void commit() {
-      if (msc == null) {
-        LOG.error("Metastore client was null. Could not commit txn " + this);
-        return;
-      }
+    private void commit() throws TException {
       if (status == TxnStatus.OPEN) {
-        try {
-          msc.commitTxn(txnId);
-          status = TxnStatus.COMMITTED;
-        } catch (TException e) {
-          LOG.error("Caught an exception while committing compaction txn in 
worker " + workerName, e);
-        }
+        msc.commitTxn(txnId);

Review comment:
       @veghlaci05 have you seen my comment above?

##########
File path: ql/src/java/org/apache/hadoop/hive/ql/txn/compactor/Worker.java
##########
@@ -715,28 +705,26 @@ void wasSuccessful() {
      * @throws Exception
      */
     @Override public void close() throws Exception {
+      //the transaction is about to close, we can stop heartbeating regardless 
of it's state
+      shutdownHeartbeater();
       if (status != TxnStatus.UNKNOWN) {
-        // turn off error logging in heartbeater in case of race condition 
between commit/abort and heartbeating
-        heartbeater.shouldLogError(false);
         if (succeessfulCompaction) {
           commit();
         } else {
           abort();
         }
       }
-      shutdownHeartbeater();
     }
 
     private void shutdownHeartbeater() {
       if (heartbeatExecutor != null) {
         heartbeatExecutor.shutdownNow();
         try {
           if (!heartbeatExecutor.awaitTermination(5, TimeUnit.SECONDS)) {
-            heartbeatExecutor.shutdownNow();
+            LOG.debug("Heartbeating for transaction {} did not stop in 5 
seconds, do not wait any longer.", this);
           }
-          LOG.debug("Successfully stopped heartbeating for transaction {}", 
this);
         } catch (InterruptedException ex) {
-          heartbeatExecutor.shutdownNow();
+          //Caller thread was interrupted while waiting for heartbeater to 
terminate, nothing to do

Review comment:
       If there is an interrupt during `heartbeatExecutor#awaitTermination` we 
will never see another log (nor stacktrace) that there was an attempt to 
shutdown the heartbeater. Moreover, I don't feel very comfortable about 
swallowing completely an interrupted exception. The least that I would expect 
here is:
   
   `Thread.currentThread().interrupt();`
   
   but if you have thoroughly though about it and it is not necessary then I 
trust your judgement.
   
   TLDR 
   Citing the "Java Concurrency in Practice":
   Propagate the InterruptedException . This is often the most sensible policy 
if you can get away with it just propagate the InterruptedException to your 
caller. This could involve not catching InterruptedException , or catching it 
and throwing it again after performing some brief activity specific cleanup.
   
   Restore the interrupt. Sometimes you cannot throw InterruptedException , for 
instance when your code is part of a Runnable . In these situations, you must 
catch InterruptedException and restore the interrupted status by calling 
interrupt on the current thread, so that code higher up the call stack can see 
that an interrupt was issued, as demonstrated in Listing 5.10.
   
   You can get much more sophisticated with interruption, but these two 
approaches should work in the vast majority of situations. But there is one 
thing you should not do with InterruptedException catch it and do nothing in 
response. This deprives code higher up on the call stack of the opportunity to 
act on the interruption, because the evidence that the thread was interrupted 
is lost. The only situation in which it is acceptable to swallow an interrupt 
is when you are extending Thread and therefore control all the code higher up 
on the call stack. Cancellation and interruption are covered in greater detail 
in Chapter 7.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: gitbox-unsubscr...@hive.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


Issue Time Tracking
-------------------

    Worklog Id:     (was: 721184)
    Time Spent: 2h 40m  (was: 2.5h)

> Compaction txn heartbeating after Worker timeout
> ------------------------------------------------
>
>                 Key: HIVE-25898
>                 URL: https://issues.apache.org/jira/browse/HIVE-25898
>             Project: Hive
>          Issue Type: Bug
>          Components: Hive
>            Reporter: László Végh
>            Assignee: László Végh
>            Priority: Major
>              Labels: pull-request-available
>          Time Spent: 2h 40m
>  Remaining Estimate: 0h
>
> In some cases, when the compaction transaction is aborted, the hearbeater 
> thread is not shut down and keeps heartbeating.



--
This message was sent by Atlassian Jira
(v8.20.1#820001)

Reply via email to