This is an automated email from the ASF dual-hosted git repository. dkuzmenko pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/hive.git
The following commit(s) were added to refs/heads/master by this push: new d8f9a977cf3 HIVE-26947: Handle compaction.Worker sleep for err cases to avoid high frequency connection respawning to HMS (Akshat Mathur, reviewed by Denys Kuzmenko, Laszlo Vegh) d8f9a977cf3 is described below commit d8f9a977cf37afcb2b5cdc7b1ea9e57e26908bc3 Author: Akshat <akshatats...@gmail.com> AuthorDate: Fri Jan 27 22:25:54 2023 +0530 HIVE-26947: Handle compaction.Worker sleep for err cases to avoid high frequency connection respawning to HMS (Akshat Mathur, reviewed by Denys Kuzmenko, Laszlo Vegh) Closes #3955 --- .../java/org/apache/hadoop/hive/conf/HiveConf.java | 9 +++++++ .../hadoop/hive/ql/txn/compactor/Worker.java | 30 ++++++++++++++++------ .../hadoop/hive/ql/txn/compactor/TestWorker.java | 2 ++ 3 files changed, 33 insertions(+), 8 deletions(-) diff --git a/common/src/java/org/apache/hadoop/hive/conf/HiveConf.java b/common/src/java/org/apache/hadoop/hive/conf/HiveConf.java index e2aa4cf845d..ea7c56d10f9 100644 --- a/common/src/java/org/apache/hadoop/hive/conf/HiveConf.java +++ b/common/src/java/org/apache/hadoop/hive/conf/HiveConf.java @@ -3215,6 +3215,15 @@ public class HiveConf extends Configuration { "Time in seconds after which a compaction job will be declared failed and the\n" + "compaction re-queued."), + HIVE_COMPACTOR_WORKER_SLEEP_TIME("hive.compactor.worker.sleep.time", "10800ms", + new TimeValidator(TimeUnit.MILLISECONDS), + "Time in milliseconds for which a worker threads goes into sleep before starting another iteration " + + "in case of no launched job or error"), + + HIVE_COMPACTOR_WORKER_MAX_SLEEP_TIME("hive.compactor.worker.max.sleep.time", "320000ms", + new TimeValidator(TimeUnit.MILLISECONDS), + "Max time in milliseconds for which a worker threads goes into sleep before starting another iteration " + + "used for backoff in case of no launched job or error"), HIVE_COMPACTOR_CHECK_INTERVAL("hive.compactor.check.interval", "300s", new TimeValidator(TimeUnit.SECONDS), "Time in seconds between checks to see if any tables or partitions need to be\n" + diff --git a/ql/src/java/org/apache/hadoop/hive/ql/txn/compactor/Worker.java b/ql/src/java/org/apache/hadoop/hive/ql/txn/compactor/Worker.java index b8e94509b81..b0bec164fce 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/txn/compactor/Worker.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/txn/compactor/Worker.java @@ -79,7 +79,8 @@ import java.util.stream.Collectors; public class Worker extends RemoteCompactorThread implements MetaStoreThread { static final private String CLASS_NAME = Worker.class.getName(); static final private Logger LOG = LoggerFactory.getLogger(CLASS_NAME); - static final private long SLEEP_TIME = 10000; + private static long SLEEP_TIME_MAX; + static private long SLEEP_TIME; private String workerName; private final CompactorFactory compactorFactory; @@ -102,12 +103,14 @@ public class Worker extends RemoteCompactorThread implements MetaStoreThread { boolean genericStats = conf.getBoolVar(HiveConf.ConfVars.HIVE_COMPACTOR_GATHER_STATS); boolean mrStats = conf.getBoolVar(HiveConf.ConfVars.HIVE_MR_COMPACTOR_GATHER_STATS); long timeout = conf.getTimeVar(HiveConf.ConfVars.HIVE_COMPACTOR_WORKER_TIMEOUT, TimeUnit.MILLISECONDS); + long nextSleep = SLEEP_TIME; boolean launchedJob; ExecutorService executor = getTimeoutHandlingExecutor(); try { do { long startedAt = System.currentTimeMillis(); - launchedJob = true; + boolean err = false; + launchedJob = false; Future<Boolean> singleRun = executor.submit(() -> findNextCompactionAndExecute(genericStats, mrStats)); try { launchedJob = singleRun.get(timeout, TimeUnit.MILLISECONDS); @@ -118,24 +121,33 @@ public class Worker extends RemoteCompactorThread implements MetaStoreThread { singleRun.cancel(true); executor.shutdownNow(); executor = getTimeoutHandlingExecutor(); + err = true; } catch (ExecutionException e) { LOG.info("Exception during executing compaction", e); + err = true; } catch (InterruptedException ie) { - // do not ignore interruption requests - return; + Thread.currentThread().interrupt(); + } catch (Throwable t) { + err = true; } doPostLoopActions(System.currentTimeMillis() - startedAt); // If we didn't try to launch a job it either means there was no work to do or we got - // here as the result of a communication failure with the DB. Either way we want to wait + // here as the result of an error like communication failure with the DB, schema failures etc. Either way we want to wait // a bit before, otherwise we can start over the loop immediately. - if (!launchedJob && !stop.get()) { - Thread.sleep(SLEEP_TIME); + if ((!launchedJob || err) && !stop.get()) { + Thread.sleep(nextSleep); } + //Backoff mechanism + //Increase sleep time if error persist + //Reset sleep time to default once error is resolved + nextSleep = (err) ? nextSleep * 2 : SLEEP_TIME; + if (nextSleep > SLEEP_TIME_MAX) nextSleep = SLEEP_TIME_MAX; + } while (!stop.get()); } catch (InterruptedException e) { - // do not ignore interruption requests + Thread.currentThread().interrupt(); } catch (Throwable t) { LOG.error("Caught an exception in the main loop of compactor worker, exiting.", t); } finally { @@ -152,6 +164,8 @@ public class Worker extends RemoteCompactorThread implements MetaStoreThread { @Override public void init(AtomicBoolean stop) throws Exception { super.init(stop); + SLEEP_TIME = conf.getTimeVar(HiveConf.ConfVars.HIVE_COMPACTOR_WORKER_SLEEP_TIME, TimeUnit.MILLISECONDS); + SLEEP_TIME_MAX = conf.getTimeVar(HiveConf.ConfVars.HIVE_COMPACTOR_WORKER_MAX_SLEEP_TIME, TimeUnit.MILLISECONDS); this.workerName = getWorkerId(); setName(workerName); } diff --git a/ql/src/test/org/apache/hadoop/hive/ql/txn/compactor/TestWorker.java b/ql/src/test/org/apache/hadoop/hive/ql/txn/compactor/TestWorker.java index 72fad1c1efc..905d82a0659 100644 --- a/ql/src/test/org/apache/hadoop/hive/ql/txn/compactor/TestWorker.java +++ b/ql/src/test/org/apache/hadoop/hive/ql/txn/compactor/TestWorker.java @@ -1201,6 +1201,8 @@ public class TestWorker extends CompactorTest { ExecutorService executor = Executors.newSingleThreadExecutor(); HiveConf timeoutConf = new HiveConf(conf); timeoutConf.setTimeVar(HiveConf.ConfVars.HIVE_COMPACTOR_WORKER_TIMEOUT, timeout, TimeUnit.MILLISECONDS); + timeoutConf.setTimeVar(HiveConf.ConfVars.HIVE_COMPACTOR_WORKER_SLEEP_TIME, 20, TimeUnit.MILLISECONDS); + timeoutConf.setTimeVar(HiveConf.ConfVars.HIVE_COMPACTOR_WORKER_MAX_SLEEP_TIME, 20, TimeUnit.MILLISECONDS); TimeoutWorker timeoutWorker = getTimeoutWorker(timeoutConf, executor, runForever, swallowInterrupt, new CountDownLatch(2));