This is an automated email from the ASF dual-hosted git repository. kturner pushed a commit to branch elasticity in repository https://gitbox.apache.org/repos/asf/accumulo.git
The following commit(s) were added to refs/heads/elasticity by this push: new 9d80b65f0e fixes clean shutdown bug in manager (#4248) 9d80b65f0e is described below commit 9d80b65f0eec60ad6d137cc8400798380cc55fe7 Author: Keith Turner <ktur...@apache.org> AuthorDate: Fri Feb 9 18:04:17 2024 -0500 fixes clean shutdown bug in manager (#4248) This commits fixes #4209 by shutting down fate before unassigning any tablets on which fate depends. --- .../java/org/apache/accumulo/core/fate/Fate.java | 47 +++++++++++++++++----- .../accumulo/core/fate/ReadOnlyFateStore.java | 2 + .../org/apache/accumulo/core/fate/ZooStore.java | 5 +++ .../accumulo/core/fate/accumulo/AccumuloStore.java | 5 +++ .../apache/accumulo/core/logging/FateLogger.java | 6 +++ .../org/apache/accumulo/core/fate/TestStore.java | 5 +++ .../java/org/apache/accumulo/manager/Manager.java | 9 ++++- .../java/org/apache/accumulo/test/fate/FateIT.java | 12 +++--- 8 files changed, 74 insertions(+), 17 deletions(-) diff --git a/core/src/main/java/org/apache/accumulo/core/fate/Fate.java b/core/src/main/java/org/apache/accumulo/core/fate/Fate.java index 35807ee0fc..3828bb80c4 100644 --- a/core/src/main/java/org/apache/accumulo/core/fate/Fate.java +++ b/core/src/main/java/org/apache/accumulo/core/fate/Fate.java @@ -459,20 +459,45 @@ public class Fate<T> { } /** - * Flags that FATE threadpool to clear out and end. Does not actively stop running FATE processes. + * Initiates shutdown of background threads and optionally waits on them. */ - public void shutdown() { - keepRunning.set(false); - fatePoolWatcher.shutdown(); - if (executor != null) { + public void shutdown(long timeout, TimeUnit timeUnit) { + if (keepRunning.compareAndSet(true, false)) { + fatePoolWatcher.shutdown(); executor.shutdown(); + workFinder.interrupt(); } - workFinder.interrupt(); - try { - workFinder.join(); - } catch (InterruptedException e) { - throw new RuntimeException(e); + + if (timeout > 0) { + long start = System.nanoTime(); + + while ((System.nanoTime() - start) < timeUnit.toNanos(timeout) + && (workFinder.isAlive() || !executor.isTerminated())) { + try { + if (!executor.awaitTermination(1, SECONDS)) { + log.debug("Fate {} is waiting for worker threads to terminate", store.type()); + continue; + } + + workFinder.join(1_000); + if (workFinder.isAlive()) { + log.debug("Fate {} is waiting for work finder thread to terminate", store.type()); + workFinder.interrupt(); + } + } catch (InterruptedException e) { + throw new RuntimeException(e); + } + } + + if (workFinder.isAlive() || !executor.isTerminated()) { + log.warn( + "Waited for {}ms for all fate {} background threads to stop, but some are still running. workFinder:{} executor:{}", + TimeUnit.NANOSECONDS.toMillis(System.nanoTime() - start), store.type(), + workFinder.isAlive(), !executor.isTerminated()); + } } - } + // interrupt the background threads + executor.shutdownNow(); + } } diff --git a/core/src/main/java/org/apache/accumulo/core/fate/ReadOnlyFateStore.java b/core/src/main/java/org/apache/accumulo/core/fate/ReadOnlyFateStore.java index deb79413c9..bdbb7739f9 100644 --- a/core/src/main/java/org/apache/accumulo/core/fate/ReadOnlyFateStore.java +++ b/core/src/main/java/org/apache/accumulo/core/fate/ReadOnlyFateStore.java @@ -158,4 +158,6 @@ public interface ReadOnlyFateStore<T> { * @return the current number of transactions that have been deferred */ int getDeferredCount(); + + FateInstanceType type(); } diff --git a/core/src/main/java/org/apache/accumulo/core/fate/ZooStore.java b/core/src/main/java/org/apache/accumulo/core/fate/ZooStore.java index d0ef960054..6813e727c5 100644 --- a/core/src/main/java/org/apache/accumulo/core/fate/ZooStore.java +++ b/core/src/main/java/org/apache/accumulo/core/fate/ZooStore.java @@ -118,6 +118,11 @@ public class ZooStore<T> extends AbstractFateStore<T> { return new Pair<>(node.status, node.fateKey); } + @Override + public FateInstanceType type() { + return fateInstanceType; + } + private class FateTxStoreImpl extends AbstractFateTxStoreImpl<T> { private FateTxStoreImpl(FateId fateId, boolean isReserved) { diff --git a/core/src/main/java/org/apache/accumulo/core/fate/accumulo/AccumuloStore.java b/core/src/main/java/org/apache/accumulo/core/fate/accumulo/AccumuloStore.java index 328560b150..7fd4b967cb 100644 --- a/core/src/main/java/org/apache/accumulo/core/fate/accumulo/AccumuloStore.java +++ b/core/src/main/java/org/apache/accumulo/core/fate/accumulo/AccumuloStore.java @@ -251,6 +251,11 @@ public class AccumuloStore<T> extends AbstractFateStore<T> { } } + @Override + public FateInstanceType type() { + return fateInstanceType; + } + private class FateTxStoreImpl extends AbstractFateTxStoreImpl<T> { private FateTxStoreImpl(FateId fateId, boolean isReserved) { diff --git a/core/src/main/java/org/apache/accumulo/core/logging/FateLogger.java b/core/src/main/java/org/apache/accumulo/core/logging/FateLogger.java index 0879fbaea8..d646389f92 100644 --- a/core/src/main/java/org/apache/accumulo/core/logging/FateLogger.java +++ b/core/src/main/java/org/apache/accumulo/core/logging/FateLogger.java @@ -27,6 +27,7 @@ import java.util.stream.Stream; import org.apache.accumulo.core.fate.Fate; import org.apache.accumulo.core.fate.FateId; +import org.apache.accumulo.core.fate.FateInstanceType; import org.apache.accumulo.core.fate.FateKey; import org.apache.accumulo.core.fate.FateStore; import org.apache.accumulo.core.fate.FateStore.FateTxStore; @@ -138,6 +139,11 @@ public class FateLogger { return store.getDeferredCount(); } + @Override + public FateInstanceType type() { + return store.type(); + } + @Override public boolean isDeferredOverflow() { return store.isDeferredOverflow(); diff --git a/core/src/test/java/org/apache/accumulo/core/fate/TestStore.java b/core/src/test/java/org/apache/accumulo/core/fate/TestStore.java index 18089848df..6c69de60ef 100644 --- a/core/src/test/java/org/apache/accumulo/core/fate/TestStore.java +++ b/core/src/test/java/org/apache/accumulo/core/fate/TestStore.java @@ -226,6 +226,11 @@ public class TestStore implements FateStore<String> { return 0; } + @Override + public FateInstanceType type() { + throw new UnsupportedOperationException(); + } + @Override public boolean isDeferredOverflow() { return false; diff --git a/server/manager/src/main/java/org/apache/accumulo/manager/Manager.java b/server/manager/src/main/java/org/apache/accumulo/manager/Manager.java index 8c1114df78..09b40386bb 100644 --- a/server/manager/src/main/java/org/apache/accumulo/manager/Manager.java +++ b/server/manager/src/main/java/org/apache/accumulo/manager/Manager.java @@ -648,9 +648,16 @@ public class Manager extends AbstractServer case CLEAN_STOP: switch (getManagerState()) { case NORMAL: + // USER fate stores its data in a user table and its operations may interact with + // all tables, need to completely shut it down before unloading user tablets + fate(FateInstanceType.USER).shutdown(1, MINUTES); setManagerState(ManagerState.SAFE_MODE); break; case SAFE_MODE: { + // META fate stores its data in Zookeeper and its operations interact with + // metadata and root tablets, need to completely shut it down before unloading + // metadata and root tablets + fate(FateInstanceType.META).shutdown(1, MINUTES); int count = nonMetaDataTabletsAssignedOrHosted(); log.debug( String.format("There are %d non-metadata tablets assigned or hosted", count)); @@ -1145,7 +1152,7 @@ public class Manager extends AbstractServer sleepUninterruptibly(500, MILLISECONDS); } log.info("Shutting down fate."); - getFateRefs().keySet().forEach(type -> fate(type).shutdown()); + getFateRefs().keySet().forEach(type -> fate(type).shutdown(0, MINUTES)); splitter.stop(); diff --git a/test/src/main/java/org/apache/accumulo/test/fate/FateIT.java b/test/src/main/java/org/apache/accumulo/test/fate/FateIT.java index a373a58c73..7bd350c577 100644 --- a/test/src/main/java/org/apache/accumulo/test/fate/FateIT.java +++ b/test/src/main/java/org/apache/accumulo/test/fate/FateIT.java @@ -31,6 +31,7 @@ import static org.junit.jupiter.api.Assertions.assertTrue; import java.util.HashSet; import java.util.Set; import java.util.concurrent.CountDownLatch; +import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicInteger; import java.util.concurrent.atomic.AtomicLong; @@ -176,7 +177,7 @@ public abstract class FateIT extends SharedMiniClusterBase implements FateTestRu Wait.waitFor(() -> getTxStatus(sctx, fateId) == UNKNOWN); } finally { - fate.shutdown(); + fate.shutdown(10, TimeUnit.MINUTES); } } @@ -210,7 +211,7 @@ public abstract class FateIT extends SharedMiniClusterBase implements FateTestRu fate.delete(fateId); assertEquals(UNKNOWN, getTxStatus(sctx, fateId)); } finally { - fate.shutdown(); + fate.shutdown(10, TimeUnit.MINUTES); } } @@ -245,7 +246,7 @@ public abstract class FateIT extends SharedMiniClusterBase implements FateTestRu fate.delete(fateId); assertEquals(UNKNOWN, getTxStatus(sctx, fateId)); } finally { - fate.shutdown(); + fate.shutdown(10, TimeUnit.MINUTES); } } @@ -275,8 +276,9 @@ public abstract class FateIT extends SharedMiniClusterBase implements FateTestRu callStarted.await(); // cancel the transaction assertFalse(fate.cancel(fateId)); + finishCall.countDown(); } finally { - fate.shutdown(); + fate.shutdown(10, TimeUnit.MINUTES); } } @@ -348,7 +350,7 @@ public abstract class FateIT extends SharedMiniClusterBase implements FateTestRu }); } finally { - fate.shutdown(); + fate.shutdown(10, TimeUnit.MINUTES); } }