This is an automated email from the ASF dual-hosted git repository. ndimiduk pushed a commit to branch branch-1 in repository https://gitbox.apache.org/repos/asf/hbase.git
The following commit(s) were added to refs/heads/branch-1 by this push: new 46f6d46 HBASE-24658 Update PolicyBasedChaosMonkey to handle uncaught exceptions 46f6d46 is described below commit 46f6d46b646dd747e8bbcc225716baa4e44fdb21 Author: Nick Dimiduk <ndimi...@apache.org> AuthorDate: Mon Jun 29 16:35:26 2020 -0700 HBASE-24658 Update PolicyBasedChaosMonkey to handle uncaught exceptions Running `ServerKillingChaosMonkey` via `RESTApiClusterManager` for any duration of time slowly leaks region servers. I see failures on the RESTApi side go unreported on the ChaosMonkey side. It seems like `RuntimeException`s are being thrown and lost. `PolicyBasedChaosMonkey` uses a primitive means of thread management anyway. Update to use a thread pool, thread groups, and an uncaughtExceptionHandler. Signed-off-by: Bharath Vissapragada <bhara...@apache.org> Signed-off-by: Viraj Jasani <vjas...@apache.org> --- .../chaos/monkies/PolicyBasedChaosMonkey.java | 72 ++++++++++++---------- 1 file changed, 38 insertions(+), 34 deletions(-) diff --git a/hbase-it/src/test/java/org/apache/hadoop/hbase/chaos/monkies/PolicyBasedChaosMonkey.java b/hbase-it/src/test/java/org/apache/hadoop/hbase/chaos/monkies/PolicyBasedChaosMonkey.java index 277e221..4c5d701 100644 --- a/hbase-it/src/test/java/org/apache/hadoop/hbase/chaos/monkies/PolicyBasedChaosMonkey.java +++ b/hbase-it/src/test/java/org/apache/hadoop/hbase/chaos/monkies/PolicyBasedChaosMonkey.java @@ -1,4 +1,4 @@ -/** +/* * Licensed to the Apache Software Foundation (ASF) under one * or more contributor license agreements. See the NOTICE file * distributed with this work for additional information @@ -18,11 +18,16 @@ package org.apache.hadoop.hbase.chaos.monkies; +import com.google.common.util.concurrent.ThreadFactoryBuilder; import java.util.Arrays; import java.util.Collection; import java.util.Collections; import java.util.List; +import java.util.Objects; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.Executors; +import java.util.concurrent.TimeUnit; import org.apache.commons.lang.math.RandomUtils; import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; @@ -37,28 +42,44 @@ public class PolicyBasedChaosMonkey extends ChaosMonkey { private static final Log LOG = LogFactory.getLog(PolicyBasedChaosMonkey.class); private static final long ONE_SEC = 1000; - private static final long FIVE_SEC = 5 * ONE_SEC; private static final long ONE_MIN = 60 * ONE_SEC; public static final long TIMEOUT = ONE_MIN; final IntegrationTestingUtility util; + private final Policy[] policies; + private final ExecutorService monkeyThreadPool; + /** * Construct a new ChaosMonkey * @param util the HBaseIntegrationTestingUtility already configured * @param policies custom policies to use */ - public PolicyBasedChaosMonkey(IntegrationTestingUtility util, Policy... policies) { - this.util = util; - this.policies = policies; + public PolicyBasedChaosMonkey(IntegrationTestingUtility util, Collection<Policy> policies) { + this(util, policies.toArray(new Policy[0])); } - public PolicyBasedChaosMonkey(IntegrationTestingUtility util, Collection<Policy> policies) { - this.util = util; - this.policies = policies.toArray(new Policy[policies.size()]); + public PolicyBasedChaosMonkey(IntegrationTestingUtility util, Policy... policies) { + this.util = Objects.requireNonNull(util); + this.policies = Objects.requireNonNull(policies); + if (policies.length == 0) { + throw new IllegalArgumentException("policies may not be empty"); + } + this.monkeyThreadPool = buildMonkeyThreadPool(policies.length); } + private static ExecutorService buildMonkeyThreadPool(final int size) { + return Executors.newFixedThreadPool(size, new ThreadFactoryBuilder() + .setDaemon(false) + .setNameFormat("ChaosMonkey-%d") + .setUncaughtExceptionHandler(new Thread.UncaughtExceptionHandler() { + @Override public void uncaughtException(Thread t, Throwable e) { + throw new RuntimeException(e); + } + }) + .build()); + } /** Selects a random item from the given items */ public static <T> T selectRandomItem(T[] items) { @@ -99,27 +120,20 @@ public class PolicyBasedChaosMonkey extends ChaosMonkey { return originalItems.subList(startIndex, startIndex + selectedNumber); } - private Policy[] policies; - private Thread[] monkeyThreads; - @Override public void start() throws Exception { - monkeyThreads = new Thread[policies.length]; - - for (int i=0; i<policies.length; i++) { - policies[i].init(new Policy.PolicyContext(this.util)); - Thread monkeyThread = new Thread(policies[i], "ChaosMonkeyThread"); - monkeyThread.start(); - monkeyThreads[i] = monkeyThread; + final Policy.PolicyContext context = new Policy.PolicyContext(this.util); + for (final Policy policy : policies) { + policy.init(context); + monkeyThreadPool.execute(policy); } } @Override public void stop(String why) { - if (policies == null) { - return; - } - + // stop accepting new work (shouldn't be any with a fixed-size pool) + monkeyThreadPool.shutdown(); + // notify all executing policies that it's time to halt. for (Policy policy : policies) { policy.stop(why); } @@ -127,22 +141,12 @@ public class PolicyBasedChaosMonkey extends ChaosMonkey { @Override public boolean isStopped() { - return policies[0].isStopped(); + return monkeyThreadPool.isTerminated(); } - /** - * Wait for ChaosMonkey to stop. - * @throws InterruptedException - */ @Override public void waitForStop() throws InterruptedException { - if (monkeyThreads == null) { - return; - } - for (Thread monkeyThread : monkeyThreads) { - // TODO: bound the wait time per policy - monkeyThread.join(); - } + monkeyThreadPool.awaitTermination(1, TimeUnit.MINUTES); } @Override