This is an automated email from the ASF dual-hosted git repository.

dkuzmenko pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/hive.git


The following commit(s) were added to refs/heads/master by this push:
     new d8f9a977cf3 HIVE-26947: Handle compaction.Worker sleep for err cases 
to avoid high frequency connection respawning to HMS (Akshat Mathur, reviewed 
by Denys Kuzmenko, Laszlo Vegh)
d8f9a977cf3 is described below

commit d8f9a977cf37afcb2b5cdc7b1ea9e57e26908bc3
Author: Akshat <akshatats...@gmail.com>
AuthorDate: Fri Jan 27 22:25:54 2023 +0530

    HIVE-26947: Handle compaction.Worker sleep for err cases to avoid high 
frequency connection respawning to HMS (Akshat Mathur, reviewed by Denys 
Kuzmenko, Laszlo Vegh)
    
    Closes #3955
---
 .../java/org/apache/hadoop/hive/conf/HiveConf.java |  9 +++++++
 .../hadoop/hive/ql/txn/compactor/Worker.java       | 30 ++++++++++++++++------
 .../hadoop/hive/ql/txn/compactor/TestWorker.java   |  2 ++
 3 files changed, 33 insertions(+), 8 deletions(-)

diff --git a/common/src/java/org/apache/hadoop/hive/conf/HiveConf.java 
b/common/src/java/org/apache/hadoop/hive/conf/HiveConf.java
index e2aa4cf845d..ea7c56d10f9 100644
--- a/common/src/java/org/apache/hadoop/hive/conf/HiveConf.java
+++ b/common/src/java/org/apache/hadoop/hive/conf/HiveConf.java
@@ -3215,6 +3215,15 @@ public class HiveConf extends Configuration {
         "Time in seconds after which a compaction job will be declared failed 
and the\n" +
         "compaction re-queued."),
 
+    HIVE_COMPACTOR_WORKER_SLEEP_TIME("hive.compactor.worker.sleep.time", 
"10800ms",
+        new TimeValidator(TimeUnit.MILLISECONDS),
+        "Time in milliseconds for which a worker threads goes into sleep 
before starting another iteration " +
+                "in case of no launched job or error"),
+
+    
HIVE_COMPACTOR_WORKER_MAX_SLEEP_TIME("hive.compactor.worker.max.sleep.time", 
"320000ms",
+        new TimeValidator(TimeUnit.MILLISECONDS),
+        "Max time in milliseconds for which a worker threads goes into sleep 
before starting another iteration " +
+                "used for backoff in case of no launched job or error"),
     HIVE_COMPACTOR_CHECK_INTERVAL("hive.compactor.check.interval", "300s",
         new TimeValidator(TimeUnit.SECONDS),
         "Time in seconds between checks to see if any tables or partitions 
need to be\n" +
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/txn/compactor/Worker.java 
b/ql/src/java/org/apache/hadoop/hive/ql/txn/compactor/Worker.java
index b8e94509b81..b0bec164fce 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/txn/compactor/Worker.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/txn/compactor/Worker.java
@@ -79,7 +79,8 @@ import java.util.stream.Collectors;
 public class Worker extends RemoteCompactorThread implements MetaStoreThread {
   static final private String CLASS_NAME = Worker.class.getName();
   static final private Logger LOG = LoggerFactory.getLogger(CLASS_NAME);
-  static final private long SLEEP_TIME = 10000;
+  private static long SLEEP_TIME_MAX;
+  static private long SLEEP_TIME;
 
   private String workerName;
   private final CompactorFactory compactorFactory;
@@ -102,12 +103,14 @@ public class Worker extends RemoteCompactorThread 
implements MetaStoreThread {
     boolean genericStats = 
conf.getBoolVar(HiveConf.ConfVars.HIVE_COMPACTOR_GATHER_STATS);
     boolean mrStats = 
conf.getBoolVar(HiveConf.ConfVars.HIVE_MR_COMPACTOR_GATHER_STATS);
     long timeout = 
conf.getTimeVar(HiveConf.ConfVars.HIVE_COMPACTOR_WORKER_TIMEOUT, 
TimeUnit.MILLISECONDS);
+    long nextSleep = SLEEP_TIME;
     boolean launchedJob;
     ExecutorService executor = getTimeoutHandlingExecutor();
     try {
       do {
         long startedAt = System.currentTimeMillis();
-        launchedJob = true;
+        boolean err = false;
+        launchedJob = false;
         Future<Boolean> singleRun = executor.submit(() -> 
findNextCompactionAndExecute(genericStats, mrStats));
         try {
           launchedJob = singleRun.get(timeout, TimeUnit.MILLISECONDS);
@@ -118,24 +121,33 @@ public class Worker extends RemoteCompactorThread 
implements MetaStoreThread {
           singleRun.cancel(true);
           executor.shutdownNow();
           executor = getTimeoutHandlingExecutor();
+          err = true;
         } catch (ExecutionException e) {
           LOG.info("Exception during executing compaction", e);
+          err = true;
         } catch (InterruptedException ie) {
-          // do not ignore interruption requests
-          return;
+          Thread.currentThread().interrupt();
+        } catch (Throwable t) {
+          err = true;
         }
 
         doPostLoopActions(System.currentTimeMillis() - startedAt);
 
         // If we didn't try to launch a job it either means there was no work 
to do or we got
-        // here as the result of a communication failure with the DB.  Either 
way we want to wait
+        // here as the result of an error like communication failure with the 
DB, schema failures etc.  Either way we want to wait
         // a bit before, otherwise we can start over the loop immediately.
-        if (!launchedJob && !stop.get()) {
-          Thread.sleep(SLEEP_TIME);
+        if ((!launchedJob || err) && !stop.get()) {
+          Thread.sleep(nextSleep);
         }
+        //Backoff mechanism
+        //Increase sleep time if error persist
+        //Reset sleep time to default once error is resolved
+        nextSleep = (err) ? nextSleep * 2 : SLEEP_TIME;
+        if (nextSleep > SLEEP_TIME_MAX) nextSleep = SLEEP_TIME_MAX;
+
       } while (!stop.get());
     } catch (InterruptedException e) {
-      // do not ignore interruption requests
+      Thread.currentThread().interrupt();
     } catch (Throwable t) {
       LOG.error("Caught an exception in the main loop of compactor worker, 
exiting.", t);
     } finally {
@@ -152,6 +164,8 @@ public class Worker extends RemoteCompactorThread 
implements MetaStoreThread {
   @Override
   public void init(AtomicBoolean stop) throws Exception {
     super.init(stop);
+    SLEEP_TIME = 
conf.getTimeVar(HiveConf.ConfVars.HIVE_COMPACTOR_WORKER_SLEEP_TIME, 
TimeUnit.MILLISECONDS);
+    SLEEP_TIME_MAX = 
conf.getTimeVar(HiveConf.ConfVars.HIVE_COMPACTOR_WORKER_MAX_SLEEP_TIME, 
TimeUnit.MILLISECONDS);
     this.workerName = getWorkerId();
     setName(workerName);
   }
diff --git 
a/ql/src/test/org/apache/hadoop/hive/ql/txn/compactor/TestWorker.java 
b/ql/src/test/org/apache/hadoop/hive/ql/txn/compactor/TestWorker.java
index 72fad1c1efc..905d82a0659 100644
--- a/ql/src/test/org/apache/hadoop/hive/ql/txn/compactor/TestWorker.java
+++ b/ql/src/test/org/apache/hadoop/hive/ql/txn/compactor/TestWorker.java
@@ -1201,6 +1201,8 @@ public class TestWorker extends CompactorTest {
     ExecutorService executor = Executors.newSingleThreadExecutor();
     HiveConf timeoutConf = new HiveConf(conf);
     timeoutConf.setTimeVar(HiveConf.ConfVars.HIVE_COMPACTOR_WORKER_TIMEOUT, 
timeout, TimeUnit.MILLISECONDS);
+    timeoutConf.setTimeVar(HiveConf.ConfVars.HIVE_COMPACTOR_WORKER_SLEEP_TIME, 
20, TimeUnit.MILLISECONDS);
+    
timeoutConf.setTimeVar(HiveConf.ConfVars.HIVE_COMPACTOR_WORKER_MAX_SLEEP_TIME, 
20, TimeUnit.MILLISECONDS);
 
     TimeoutWorker timeoutWorker = getTimeoutWorker(timeoutConf, executor,
         runForever, swallowInterrupt, new CountDownLatch(2));

Reply via email to