This is an automated email from the ASF dual-hosted git repository.

kturner pushed a commit to branch elasticity
in repository https://gitbox.apache.org/repos/asf/accumulo.git


The following commit(s) were added to refs/heads/elasticity by this push:
     new 9d80b65f0e fixes clean shutdown bug in manager (#4248)
9d80b65f0e is described below

commit 9d80b65f0eec60ad6d137cc8400798380cc55fe7
Author: Keith Turner <ktur...@apache.org>
AuthorDate: Fri Feb 9 18:04:17 2024 -0500

    fixes clean shutdown bug in manager (#4248)
    
    This commits fixes #4209 by shutting down fate before unassigning any
    tablets on which fate depends.
---
 .../java/org/apache/accumulo/core/fate/Fate.java   | 47 +++++++++++++++++-----
 .../accumulo/core/fate/ReadOnlyFateStore.java      |  2 +
 .../org/apache/accumulo/core/fate/ZooStore.java    |  5 +++
 .../accumulo/core/fate/accumulo/AccumuloStore.java |  5 +++
 .../apache/accumulo/core/logging/FateLogger.java   |  6 +++
 .../org/apache/accumulo/core/fate/TestStore.java   |  5 +++
 .../java/org/apache/accumulo/manager/Manager.java  |  9 ++++-
 .../java/org/apache/accumulo/test/fate/FateIT.java | 12 +++---
 8 files changed, 74 insertions(+), 17 deletions(-)

diff --git a/core/src/main/java/org/apache/accumulo/core/fate/Fate.java 
b/core/src/main/java/org/apache/accumulo/core/fate/Fate.java
index 35807ee0fc..3828bb80c4 100644
--- a/core/src/main/java/org/apache/accumulo/core/fate/Fate.java
+++ b/core/src/main/java/org/apache/accumulo/core/fate/Fate.java
@@ -459,20 +459,45 @@ public class Fate<T> {
   }
 
   /**
-   * Flags that FATE threadpool to clear out and end. Does not actively stop 
running FATE processes.
+   * Initiates shutdown of background threads and optionally waits on them.
    */
-  public void shutdown() {
-    keepRunning.set(false);
-    fatePoolWatcher.shutdown();
-    if (executor != null) {
+  public void shutdown(long timeout, TimeUnit timeUnit) {
+    if (keepRunning.compareAndSet(true, false)) {
+      fatePoolWatcher.shutdown();
       executor.shutdown();
+      workFinder.interrupt();
     }
-    workFinder.interrupt();
-    try {
-      workFinder.join();
-    } catch (InterruptedException e) {
-      throw new RuntimeException(e);
+
+    if (timeout > 0) {
+      long start = System.nanoTime();
+
+      while ((System.nanoTime() - start) < timeUnit.toNanos(timeout)
+          && (workFinder.isAlive() || !executor.isTerminated())) {
+        try {
+          if (!executor.awaitTermination(1, SECONDS)) {
+            log.debug("Fate {} is waiting for worker threads to terminate", 
store.type());
+            continue;
+          }
+
+          workFinder.join(1_000);
+          if (workFinder.isAlive()) {
+            log.debug("Fate {} is waiting for work finder thread to 
terminate", store.type());
+            workFinder.interrupt();
+          }
+        } catch (InterruptedException e) {
+          throw new RuntimeException(e);
+        }
+      }
+
+      if (workFinder.isAlive() || !executor.isTerminated()) {
+        log.warn(
+            "Waited for {}ms for all fate {} background threads to stop, but 
some are still running. workFinder:{} executor:{}",
+            TimeUnit.NANOSECONDS.toMillis(System.nanoTime() - start), 
store.type(),
+            workFinder.isAlive(), !executor.isTerminated());
+      }
     }
-  }
 
+    // interrupt the background threads
+    executor.shutdownNow();
+  }
 }
diff --git 
a/core/src/main/java/org/apache/accumulo/core/fate/ReadOnlyFateStore.java 
b/core/src/main/java/org/apache/accumulo/core/fate/ReadOnlyFateStore.java
index deb79413c9..bdbb7739f9 100644
--- a/core/src/main/java/org/apache/accumulo/core/fate/ReadOnlyFateStore.java
+++ b/core/src/main/java/org/apache/accumulo/core/fate/ReadOnlyFateStore.java
@@ -158,4 +158,6 @@ public interface ReadOnlyFateStore<T> {
    * @return the current number of transactions that have been deferred
    */
   int getDeferredCount();
+
+  FateInstanceType type();
 }
diff --git a/core/src/main/java/org/apache/accumulo/core/fate/ZooStore.java 
b/core/src/main/java/org/apache/accumulo/core/fate/ZooStore.java
index d0ef960054..6813e727c5 100644
--- a/core/src/main/java/org/apache/accumulo/core/fate/ZooStore.java
+++ b/core/src/main/java/org/apache/accumulo/core/fate/ZooStore.java
@@ -118,6 +118,11 @@ public class ZooStore<T> extends AbstractFateStore<T> {
     return new Pair<>(node.status, node.fateKey);
   }
 
+  @Override
+  public FateInstanceType type() {
+    return fateInstanceType;
+  }
+
   private class FateTxStoreImpl extends AbstractFateTxStoreImpl<T> {
 
     private FateTxStoreImpl(FateId fateId, boolean isReserved) {
diff --git 
a/core/src/main/java/org/apache/accumulo/core/fate/accumulo/AccumuloStore.java 
b/core/src/main/java/org/apache/accumulo/core/fate/accumulo/AccumuloStore.java
index 328560b150..7fd4b967cb 100644
--- 
a/core/src/main/java/org/apache/accumulo/core/fate/accumulo/AccumuloStore.java
+++ 
b/core/src/main/java/org/apache/accumulo/core/fate/accumulo/AccumuloStore.java
@@ -251,6 +251,11 @@ public class AccumuloStore<T> extends AbstractFateStore<T> 
{
     }
   }
 
+  @Override
+  public FateInstanceType type() {
+    return fateInstanceType;
+  }
+
   private class FateTxStoreImpl extends AbstractFateTxStoreImpl<T> {
 
     private FateTxStoreImpl(FateId fateId, boolean isReserved) {
diff --git 
a/core/src/main/java/org/apache/accumulo/core/logging/FateLogger.java 
b/core/src/main/java/org/apache/accumulo/core/logging/FateLogger.java
index 0879fbaea8..d646389f92 100644
--- a/core/src/main/java/org/apache/accumulo/core/logging/FateLogger.java
+++ b/core/src/main/java/org/apache/accumulo/core/logging/FateLogger.java
@@ -27,6 +27,7 @@ import java.util.stream.Stream;
 
 import org.apache.accumulo.core.fate.Fate;
 import org.apache.accumulo.core.fate.FateId;
+import org.apache.accumulo.core.fate.FateInstanceType;
 import org.apache.accumulo.core.fate.FateKey;
 import org.apache.accumulo.core.fate.FateStore;
 import org.apache.accumulo.core.fate.FateStore.FateTxStore;
@@ -138,6 +139,11 @@ public class FateLogger {
         return store.getDeferredCount();
       }
 
+      @Override
+      public FateInstanceType type() {
+        return store.type();
+      }
+
       @Override
       public boolean isDeferredOverflow() {
         return store.isDeferredOverflow();
diff --git a/core/src/test/java/org/apache/accumulo/core/fate/TestStore.java 
b/core/src/test/java/org/apache/accumulo/core/fate/TestStore.java
index 18089848df..6c69de60ef 100644
--- a/core/src/test/java/org/apache/accumulo/core/fate/TestStore.java
+++ b/core/src/test/java/org/apache/accumulo/core/fate/TestStore.java
@@ -226,6 +226,11 @@ public class TestStore implements FateStore<String> {
     return 0;
   }
 
+  @Override
+  public FateInstanceType type() {
+    throw new UnsupportedOperationException();
+  }
+
   @Override
   public boolean isDeferredOverflow() {
     return false;
diff --git 
a/server/manager/src/main/java/org/apache/accumulo/manager/Manager.java 
b/server/manager/src/main/java/org/apache/accumulo/manager/Manager.java
index 8c1114df78..09b40386bb 100644
--- a/server/manager/src/main/java/org/apache/accumulo/manager/Manager.java
+++ b/server/manager/src/main/java/org/apache/accumulo/manager/Manager.java
@@ -648,9 +648,16 @@ public class Manager extends AbstractServer
             case CLEAN_STOP:
               switch (getManagerState()) {
                 case NORMAL:
+                  // USER fate stores its data in a user table and its 
operations may interact with
+                  // all tables, need to completely shut it down before 
unloading user tablets
+                  fate(FateInstanceType.USER).shutdown(1, MINUTES);
                   setManagerState(ManagerState.SAFE_MODE);
                   break;
                 case SAFE_MODE: {
+                  // META fate stores its data in Zookeeper and its operations 
interact with
+                  // metadata and root tablets, need to completely shut it 
down before unloading
+                  // metadata and root tablets
+                  fate(FateInstanceType.META).shutdown(1, MINUTES);
                   int count = nonMetaDataTabletsAssignedOrHosted();
                   log.debug(
                       String.format("There are %d non-metadata tablets 
assigned or hosted", count));
@@ -1145,7 +1152,7 @@ public class Manager extends AbstractServer
       sleepUninterruptibly(500, MILLISECONDS);
     }
     log.info("Shutting down fate.");
-    getFateRefs().keySet().forEach(type -> fate(type).shutdown());
+    getFateRefs().keySet().forEach(type -> fate(type).shutdown(0, MINUTES));
 
     splitter.stop();
 
diff --git a/test/src/main/java/org/apache/accumulo/test/fate/FateIT.java 
b/test/src/main/java/org/apache/accumulo/test/fate/FateIT.java
index a373a58c73..7bd350c577 100644
--- a/test/src/main/java/org/apache/accumulo/test/fate/FateIT.java
+++ b/test/src/main/java/org/apache/accumulo/test/fate/FateIT.java
@@ -31,6 +31,7 @@ import static org.junit.jupiter.api.Assertions.assertTrue;
 import java.util.HashSet;
 import java.util.Set;
 import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.TimeUnit;
 import java.util.concurrent.atomic.AtomicInteger;
 import java.util.concurrent.atomic.AtomicLong;
 
@@ -176,7 +177,7 @@ public abstract class FateIT extends SharedMiniClusterBase 
implements FateTestRu
 
       Wait.waitFor(() -> getTxStatus(sctx, fateId) == UNKNOWN);
     } finally {
-      fate.shutdown();
+      fate.shutdown(10, TimeUnit.MINUTES);
     }
   }
 
@@ -210,7 +211,7 @@ public abstract class FateIT extends SharedMiniClusterBase 
implements FateTestRu
       fate.delete(fateId);
       assertEquals(UNKNOWN, getTxStatus(sctx, fateId));
     } finally {
-      fate.shutdown();
+      fate.shutdown(10, TimeUnit.MINUTES);
     }
   }
 
@@ -245,7 +246,7 @@ public abstract class FateIT extends SharedMiniClusterBase 
implements FateTestRu
       fate.delete(fateId);
       assertEquals(UNKNOWN, getTxStatus(sctx, fateId));
     } finally {
-      fate.shutdown();
+      fate.shutdown(10, TimeUnit.MINUTES);
     }
   }
 
@@ -275,8 +276,9 @@ public abstract class FateIT extends SharedMiniClusterBase 
implements FateTestRu
       callStarted.await();
       // cancel the transaction
       assertFalse(fate.cancel(fateId));
+      finishCall.countDown();
     } finally {
-      fate.shutdown();
+      fate.shutdown(10, TimeUnit.MINUTES);
     }
 
   }
@@ -348,7 +350,7 @@ public abstract class FateIT extends SharedMiniClusterBase 
implements FateTestRu
       });
 
     } finally {
-      fate.shutdown();
+      fate.shutdown(10, TimeUnit.MINUTES);
     }
   }
 

Reply via email to