This is an automated email from the ASF dual-hosted git repository. dkuzmenko pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/hive.git
The following commit(s) were added to refs/heads/master by this push: new e8fca5116d3 HIVE-26770: Make end of loop compaction logs appear more selectively and reduce code duplication (Akshat Mathur, reviewed by Denys Kuzmenko) e8fca5116d3 is described below commit e8fca5116d34bc915f0575fba360a267ff59004b Author: Akshat <akshatats...@gmail.com> AuthorDate: Wed Dec 7 15:44:34 2022 +0530 HIVE-26770: Make end of loop compaction logs appear more selectively and reduce code duplication (Akshat Mathur, reviewed by Denys Kuzmenko) Closes #3832 --- .../hadoop/hive/ql/txn/compactor/Cleaner.java | 8 ++------ .../hive/ql/txn/compactor/CompactorThread.java | 24 +++++++++++++++++++++- .../hadoop/hive/ql/txn/compactor/Initiator.java | 6 +----- .../hadoop/hive/ql/txn/compactor/Worker.java | 6 +++++- 4 files changed, 31 insertions(+), 13 deletions(-) diff --git a/ql/src/java/org/apache/hadoop/hive/ql/txn/compactor/Cleaner.java b/ql/src/java/org/apache/hadoop/hive/ql/txn/compactor/Cleaner.java index 2dc2fa873ed..9d203c880ed 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/txn/compactor/Cleaner.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/txn/compactor/Cleaner.java @@ -95,7 +95,6 @@ public class Cleaner extends MetaStoreCompactorThread { static final private String CLASS_NAME = Cleaner.class.getName(); static final private Logger LOG = LoggerFactory.getLogger(CLASS_NAME); - private long cleanerCheckInterval = 0; private boolean metricsEnabled = false; private ReplChangeManager replChangeManager; @@ -105,7 +104,7 @@ public class Cleaner extends MetaStoreCompactorThread { public void init(AtomicBoolean stop) throws Exception { super.init(stop); replChangeManager = ReplChangeManager.getInstance(conf); - cleanerCheckInterval = conf.getTimeVar( + checkInterval = conf.getTimeVar( HiveConf.ConfVars.HIVE_COMPACTOR_CLEANER_RUN_INTERVAL, TimeUnit.MILLISECONDS); cleanerExecutor = CompactorUtil.createExecutorWithThreadFactory( conf.getIntVar(HiveConf.ConfVars.HIVE_COMPACTOR_CLEANER_THREADS_NUM), @@ -181,10 +180,7 @@ public class Cleaner extends MetaStoreCompactorThread { } // Now, go back to bed until it's time to do this again long elapsedTime = System.currentTimeMillis() - startedAt; - if (elapsedTime < cleanerCheckInterval && !stop.get()) { - Thread.sleep(cleanerCheckInterval - elapsedTime); - } - LOG.debug("Cleaner thread finished one loop."); + doPostLoopActions(elapsedTime, CompactorThreadType.CLEANER); } while (!stop.get()); } catch (InterruptedException ie) { LOG.error("Compactor cleaner thread interrupted, exiting " + diff --git a/ql/src/java/org/apache/hadoop/hive/ql/txn/compactor/CompactorThread.java b/ql/src/java/org/apache/hadoop/hive/ql/txn/compactor/CompactorThread.java index 167b2728f54..215e38d37c5 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/txn/compactor/CompactorThread.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/txn/compactor/CompactorThread.java @@ -61,6 +61,14 @@ public abstract class CompactorThread extends Thread implements Configurable { protected String hostName; protected String runtimeVersion; + //Time threshold for compactor thread log + //In milliseconds: + private static final Integer MAX_WARN_LOG_TIME = 1200000; //20 min + + protected long checkInterval = 0; + + enum CompactorThreadType {INITIATOR, WORKER, CLEANER} + @Override public void setConf(Configuration configuration) { // TODO MS-SPLIT for now, keep a copy of HiveConf around as we need to call other methods with @@ -196,7 +204,7 @@ public abstract class CompactorThread extends Thread implements Configurable { protected String getRuntimeVersion() { return this.getClass().getPackage().getImplementationVersion(); } - + protected LockRequest createLockRequest(CompactionInfo ci, long txnId, LockType lockType, DataOperationType opType) { String agentInfo = Thread.currentThread().getName(); LockRequestBuilder requestBuilder = new LockRequestBuilder(agentInfo); @@ -219,4 +227,18 @@ public abstract class CompactorThread extends Thread implements Configurable { !conf.getBoolVar(HiveConf.ConfVars.TXN_WRITE_X_LOCK)); return requestBuilder.build(); } + + protected void doPostLoopActions(long elapsedTime, CompactorThreadType type) throws InterruptedException { + String threadTypeName = type.name(); + if (elapsedTime < checkInterval && !stop.get()) { + Thread.sleep(checkInterval - elapsedTime); + } + + if (elapsedTime < MAX_WARN_LOG_TIME) { + LOG.debug("{} loop took {} seconds to finish.", threadTypeName, elapsedTime/1000); + } else { + LOG.warn("Possible {} slowdown, loop took {} seconds to finish.", threadTypeName, elapsedTime/1000); + } + + } } diff --git a/ql/src/java/org/apache/hadoop/hive/ql/txn/compactor/Initiator.java b/ql/src/java/org/apache/hadoop/hive/ql/txn/compactor/Initiator.java index a434bacfdd6..08848a61390 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/txn/compactor/Initiator.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/txn/compactor/Initiator.java @@ -100,7 +100,6 @@ public class Initiator extends MetaStoreCompactorThread { static final private String COMPACTORTHRESHOLD_PREFIX = "compactorthreshold."; - private long checkInterval; private ExecutorService compactionExecutor; private Optional<Cache<String, TBase>> metaCache = Optional.empty(); private boolean metricsEnabled; @@ -228,11 +227,8 @@ public class Initiator extends MetaStoreCompactorThread { } long elapsedTime = System.currentTimeMillis() - startedAt; - if (elapsedTime < checkInterval && !stop.get()) { - Thread.sleep(checkInterval - elapsedTime); - } + doPostLoopActions(elapsedTime, CompactorThreadType.INITIATOR); - LOG.info("Initiator thread finished one loop."); } while (!stop.get()); } catch (Throwable t) { LOG.error("Caught an exception in the main loop of compactor initiator, exiting.", t); diff --git a/ql/src/java/org/apache/hadoop/hive/ql/txn/compactor/Worker.java b/ql/src/java/org/apache/hadoop/hive/ql/txn/compactor/Worker.java index fea7ff02289..7667cfc971e 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/txn/compactor/Worker.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/txn/compactor/Worker.java @@ -95,6 +95,7 @@ public class Worker extends RemoteCompactorThread implements MetaStoreThread { ExecutorService executor = getTimeoutHandlingExecutor(); try { do { + long startedAt = System.currentTimeMillis(); Future<Boolean> singleRun = executor.submit(() -> findNextCompactionAndExecute(genericStats, mrStats)); try { launchedJob = singleRun.get(timeout, TimeUnit.MILLISECONDS); @@ -123,8 +124,11 @@ public class Worker extends RemoteCompactorThread implements MetaStoreThread { } catch (InterruptedException e) { } } - LOG.info("Worker thread finished one loop."); + long elapsedTime = System.currentTimeMillis() - startedAt; + doPostLoopActions(elapsedTime, CompactorThreadType.WORKER); } while (!stop.get()); + } catch (Throwable t) { + LOG.error("Caught an exception in the main loop of compactor worker, exiting.", t); } finally { if (executor != null) { executor.shutdownNow();