This is an automated email from the ASF dual-hosted git repository.

dkuzmenko pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/hive.git


The following commit(s) were added to refs/heads/master by this push:
     new e8fca5116d3 HIVE-26770: Make end of loop compaction logs appear more 
selectively and reduce code duplication (Akshat Mathur, reviewed by Denys 
Kuzmenko)
e8fca5116d3 is described below

commit e8fca5116d34bc915f0575fba360a267ff59004b
Author: Akshat <akshatats...@gmail.com>
AuthorDate: Wed Dec 7 15:44:34 2022 +0530

    HIVE-26770: Make end of loop compaction logs appear more selectively and 
reduce code duplication (Akshat Mathur, reviewed by Denys Kuzmenko)
    
    Closes #3832
---
 .../hadoop/hive/ql/txn/compactor/Cleaner.java      |  8 ++------
 .../hive/ql/txn/compactor/CompactorThread.java     | 24 +++++++++++++++++++++-
 .../hadoop/hive/ql/txn/compactor/Initiator.java    |  6 +-----
 .../hadoop/hive/ql/txn/compactor/Worker.java       |  6 +++++-
 4 files changed, 31 insertions(+), 13 deletions(-)

diff --git a/ql/src/java/org/apache/hadoop/hive/ql/txn/compactor/Cleaner.java 
b/ql/src/java/org/apache/hadoop/hive/ql/txn/compactor/Cleaner.java
index 2dc2fa873ed..9d203c880ed 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/txn/compactor/Cleaner.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/txn/compactor/Cleaner.java
@@ -95,7 +95,6 @@ public class Cleaner extends MetaStoreCompactorThread {
 
   static final private String CLASS_NAME = Cleaner.class.getName();
   static final private Logger LOG = LoggerFactory.getLogger(CLASS_NAME);
-  private long cleanerCheckInterval = 0;
   private boolean metricsEnabled = false;
 
   private ReplChangeManager replChangeManager;
@@ -105,7 +104,7 @@ public class Cleaner extends MetaStoreCompactorThread {
   public void init(AtomicBoolean stop) throws Exception {
     super.init(stop);
     replChangeManager = ReplChangeManager.getInstance(conf);
-    cleanerCheckInterval = conf.getTimeVar(
+    checkInterval = conf.getTimeVar(
             HiveConf.ConfVars.HIVE_COMPACTOR_CLEANER_RUN_INTERVAL, 
TimeUnit.MILLISECONDS);
     cleanerExecutor = CompactorUtil.createExecutorWithThreadFactory(
             
conf.getIntVar(HiveConf.ConfVars.HIVE_COMPACTOR_CLEANER_THREADS_NUM),
@@ -181,10 +180,7 @@ public class Cleaner extends MetaStoreCompactorThread {
         }
         // Now, go back to bed until it's time to do this again
         long elapsedTime = System.currentTimeMillis() - startedAt;
-        if (elapsedTime < cleanerCheckInterval && !stop.get()) {
-          Thread.sleep(cleanerCheckInterval - elapsedTime);
-        }
-        LOG.debug("Cleaner thread finished one loop.");
+        doPostLoopActions(elapsedTime, CompactorThreadType.CLEANER);
       } while (!stop.get());
     } catch (InterruptedException ie) {
       LOG.error("Compactor cleaner thread interrupted, exiting " +
diff --git 
a/ql/src/java/org/apache/hadoop/hive/ql/txn/compactor/CompactorThread.java 
b/ql/src/java/org/apache/hadoop/hive/ql/txn/compactor/CompactorThread.java
index 167b2728f54..215e38d37c5 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/txn/compactor/CompactorThread.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/txn/compactor/CompactorThread.java
@@ -61,6 +61,14 @@ public abstract class CompactorThread extends Thread 
implements Configurable {
   protected String hostName;
   protected String runtimeVersion;
 
+  //Time threshold for compactor thread log
+  //In milliseconds:
+  private static final Integer MAX_WARN_LOG_TIME = 1200000; //20 min
+
+  protected long checkInterval = 0;
+
+  enum CompactorThreadType {INITIATOR, WORKER, CLEANER}
+
   @Override
   public void setConf(Configuration configuration) {
     // TODO MS-SPLIT for now, keep a copy of HiveConf around as we need to 
call other methods with
@@ -196,7 +204,7 @@ public abstract class CompactorThread extends Thread 
implements Configurable {
   protected String getRuntimeVersion() {
     return this.getClass().getPackage().getImplementationVersion();
   }
-  
+
   protected LockRequest createLockRequest(CompactionInfo ci, long txnId, 
LockType lockType, DataOperationType opType) {
     String agentInfo = Thread.currentThread().getName();
     LockRequestBuilder requestBuilder = new LockRequestBuilder(agentInfo);
@@ -219,4 +227,18 @@ public abstract class CompactorThread extends Thread 
implements Configurable {
       !conf.getBoolVar(HiveConf.ConfVars.TXN_WRITE_X_LOCK));
     return requestBuilder.build();
   }
+
+  protected void doPostLoopActions(long elapsedTime, CompactorThreadType type) 
throws InterruptedException {
+    String threadTypeName = type.name();
+    if (elapsedTime < checkInterval && !stop.get()) {
+      Thread.sleep(checkInterval - elapsedTime);
+    }
+
+    if (elapsedTime < MAX_WARN_LOG_TIME) {
+      LOG.debug("{} loop took {} seconds to finish.", threadTypeName, 
elapsedTime/1000);
+    } else {
+      LOG.warn("Possible {} slowdown, loop took {} seconds to finish.", 
threadTypeName, elapsedTime/1000);
+    }
+
+  }
 }
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/txn/compactor/Initiator.java 
b/ql/src/java/org/apache/hadoop/hive/ql/txn/compactor/Initiator.java
index a434bacfdd6..08848a61390 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/txn/compactor/Initiator.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/txn/compactor/Initiator.java
@@ -100,7 +100,6 @@ public class Initiator extends MetaStoreCompactorThread {
 
   static final private String COMPACTORTHRESHOLD_PREFIX = 
"compactorthreshold.";
 
-  private long checkInterval;
   private ExecutorService compactionExecutor;
   private Optional<Cache<String, TBase>> metaCache = Optional.empty();
   private boolean metricsEnabled;
@@ -228,11 +227,8 @@ public class Initiator extends MetaStoreCompactorThread {
         }
 
         long elapsedTime = System.currentTimeMillis() - startedAt;
-        if (elapsedTime < checkInterval && !stop.get()) {
-          Thread.sleep(checkInterval - elapsedTime);
-        }
+        doPostLoopActions(elapsedTime, CompactorThreadType.INITIATOR);
 
-        LOG.info("Initiator thread finished one loop.");
       } while (!stop.get());
     } catch (Throwable t) {
       LOG.error("Caught an exception in the main loop of compactor initiator, 
exiting.", t);
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/txn/compactor/Worker.java 
b/ql/src/java/org/apache/hadoop/hive/ql/txn/compactor/Worker.java
index fea7ff02289..7667cfc971e 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/txn/compactor/Worker.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/txn/compactor/Worker.java
@@ -95,6 +95,7 @@ public class Worker extends RemoteCompactorThread implements 
MetaStoreThread {
     ExecutorService executor = getTimeoutHandlingExecutor();
     try {
       do {
+        long startedAt = System.currentTimeMillis();
         Future<Boolean> singleRun = executor.submit(() -> 
findNextCompactionAndExecute(genericStats, mrStats));
         try {
           launchedJob = singleRun.get(timeout, TimeUnit.MILLISECONDS);
@@ -123,8 +124,11 @@ public class Worker extends RemoteCompactorThread 
implements MetaStoreThread {
           } catch (InterruptedException e) {
           }
         }
-        LOG.info("Worker thread finished one loop.");
+        long elapsedTime = System.currentTimeMillis() - startedAt;
+        doPostLoopActions(elapsedTime, CompactorThreadType.WORKER);
       } while (!stop.get());
+    } catch (Throwable t) {
+      LOG.error("Caught an exception in the main loop of compactor worker, 
exiting.", t);
     } finally {
       if (executor != null) {
         executor.shutdownNow();

Reply via email to