teamconfx opened a new issue, #6046:
URL: https://github.com/apache/accumulo/issues/6046

   **Describe the bug**
   Flushed mutations are intermittently lost after a tablet server restart. 
Data that was successfully acknowledged by `BatchWriter.flush()` may not be 
visible after the tablet server restarts and the tablet is reassigned. This 
violates Accumulo's durability guarantees when using the default `SYNC` 
durability setting.
   
   **Versions (OS, Maven, Java, and others, as appropriate):**
   - Affected version(s) of this project: 2.1.4
   - OS: Linux (Ubuntu)
   - Java: OpenJDK 17
   - Maven: 3.x
   
   **Root Cause Analysis**
   
   The root cause is a **missing JVM shutdown hook** in `AbstractServer` that 
would trigger graceful shutdown when SIGTERM is received.
   
   **The Problem:**
   
   1. When SIGTERM is sent to the tablet server process (via 
`Process.destroy()`), the JVM starts its shutdown sequence
   2. `AbstractServer.gracefulShutdown()` sets `shutdownRequested` to true, but 
this method is only called via RPC or when an `InterruptedException` is caught 
- **NOT when SIGTERM is received**
   3. Without a shutdown hook, `shutdownRequested` remains false during 
SIGTERM-initiated shutdown
   4. The main server loop in `TabletServer.run()` doesn't know it should 
perform graceful shutdown
   5. The JVM terminates before tablets are properly unloaded and data is 
flushed to files
   6. Data that was only in memory/WAL (not yet minor-compacted) is lost
   
   **Real-World Production Scenarios:**
   
   This bug can occur in production when tablet servers are stopped using 
standard OS/container process management instead of Accumulo's admin tools:
   
   1. **Kubernetes/Container Orchestration**: When a pod is terminated during 
rolling deployments, node draining, resource pressure, or pod eviction, 
Kubernetes sends SIGTERM directly to the process. K8s has no knowledge of 
Accumulo's RPC-based shutdown mechanism.
   
   2. **Docker**: Running `docker stop <container>` sends SIGTERM to the 
container's main process.
   
   3. **Systemd Service Management**: Running `systemctl stop accumulo-tserver` 
sends SIGTERM to the service process.
   
   4. **Cloud Provider VM Lifecycle Events**: AWS Spot Instance interruptions, 
GCP Preemptible VM shutdowns, and Azure Spot VM evictions all send SIGTERM to 
processes before terminating the VM. These events can happen at any time with 
only seconds of warning.
   
   5. **Manual Process Termination**: An operator running `kill <pid>` sends 
SIGTERM by default.
   
   In all these cases, Accumulo's `gracefulShutdown()` RPC method is never 
called, so the tablet server exits without properly unloading tablets. This is 
particularly dangerous in cloud environments where VM preemption is common and 
expected.
   
   The Accumulo admin tools (`accumulo admin stop`) use RPC to call 
`gracefulShutdown()`, but external process managers and orchestration systems 
cannot use this mechanism.
   
   **Key Code Locations:**
   
   1. 
`server/base/src/main/java/org/apache/accumulo/server/AbstractServer.java`:
      - Lines 162-184: `gracefulShutdown()` sets `shutdownRequested` but is not 
called on SIGTERM
      - Lines 63-64: `shutdownRequested` and `shutdownComplete` AtomicBooleans 
exist but aren't used in a shutdown hook
      - Lines 218-239: `runServer()` waits for the server thread but doesn't 
register a shutdown hook
   
   2. 
`server/tserver/src/main/java/org/apache/accumulo/tserver/TabletServer.java`:
      - Lines 953-974: Graceful shutdown logic that waits for tablets to be 
unloaded - only runs if `shutdownRequested` is true
   
   **To Reproduce**
   
   Add the following test to 
`test/src/main/java/org/apache/accumulo/test/functional/`:
   
   ```java
   package org.apache.accumulo.test.functional;
   
   import java.time.Duration;
   import java.util.ArrayList;
   import java.util.List;
   import java.util.TreeSet;
   
   import org.apache.accumulo.core.client.Accumulo;
   import org.apache.accumulo.core.client.AccumuloClient;
   import org.apache.accumulo.core.client.BatchWriter;
   import org.apache.accumulo.core.client.Scanner;
   import org.apache.accumulo.core.client.admin.NewTableConfiguration;
   import org.apache.accumulo.core.client.admin.TimeType;
   import org.apache.accumulo.core.data.Mutation;
   import org.apache.accumulo.core.data.Range;
   import org.apache.accumulo.core.security.Authorizations;
   import org.apache.accumulo.harness.AccumuloClusterHarness;
   import org.apache.accumulo.minicluster.ServerType;
   import org.apache.accumulo.miniclusterImpl.MiniAccumuloClusterImpl;
   import org.apache.accumulo.miniclusterImpl.ProcessReference;
   import org.apache.hadoop.io.Text;
   import org.junit.jupiter.api.Test;
   import static org.junit.jupiter.api.Assertions.assertTrue;
   import static org.junit.jupiter.api.Assertions.fail;
   
   public class DataLossAfterRestartIT extends AccumuloClusterHarness {
   
     @Override
     protected Duration defaultTimeout() {
       return Duration.ofMinutes(5);
     }
   
     @Test
     public void testDataLossAfterRestart() throws Exception {
       String tableName = getUniqueNames(1)[0];
   
       try (AccumuloClient client = 
Accumulo.newClient().from(getClientProps()).build()) {
         // Create table with logical time and a split
         TreeSet<Text> splits = new TreeSet<>();
         splits.add(new Text("m"));
         client.tableOperations().create(tableName,
             new 
NewTableConfiguration().setTimeType(TimeType.LOGICAL).withSplits(splits));
   
         BatchWriter bw = client.createBatchWriter(tableName);
   
         // Write first mutation (row 'a')
         Mutation m1 = new Mutation("a");
         m1.put("cf", "cq", "v");
         bw.addMutation(m1);
         bw.flush();
   
         // Perform merge
         client.tableOperations().merge(tableName, null, null);
   
         // Write second mutation (row 'b')
         Mutation m2 = new Mutation("b");
         m2.put("cf", "cq", "v");
         bw.addMutation(m2);
         bw.flush();
   
         // At this point, both row 'a' and row 'b' should be persisted (SYNC 
durability)
   
         // restart a tablet server using killProcess and start
         MiniAccumuloClusterImpl cluster = (MiniAccumuloClusterImpl) 
getCluster();
         List<ProcessReference> tservers = new ArrayList<>(
             cluster.getProcesses().get(ServerType.TABLET_SERVER));
   
         if (!tservers.isEmpty()) {
           ProcessReference tserver = tservers.get(0);
   
           // Kill the tablet server (sends SIGTERM, waits up to 30 seconds)
           cluster.killProcess(ServerType.TABLET_SERVER, tserver);
   
           // Start a replacement tablet server
           cluster.getClusterControl().start(ServerType.TABLET_SERVER);
   
           // Wait for cluster to stabilize
           client.instanceOperations().waitForBalance();
         }
   
         bw.close();
   
         // Now scan for row 'b' - this should succeed but intermittently fails
         try (Scanner scanner = client.createScanner(tableName, 
Authorizations.EMPTY)) {
           scanner.setRange(new Range("b"));
   
           if (!scanner.iterator().hasNext()) {
             // Check what data exists
             try (Scanner fullScan = client.createScanner(tableName, 
Authorizations.EMPTY)) {
               fullScan.setRange(new Range());
               StringBuilder sb = new StringBuilder();
               int count = 0;
               for (var entry : fullScan) {
                 sb.append(entry.getKey().getRow()).append(" ");
                 count++;
               }
               fail("DATA LOSS DETECTED: Row 'b' not found after restart. " +
                    "Full table scan found " + count + " entries: " + 
sb.toString());
             }
           }
   
           // Verify the data
           long time = scanner.iterator().next().getKey().getTimestamp();
           assertTrue(time > 0, "Expected positive timestamp");
         }
       }
     }
   }
   ```
   
   Run the test (may need multiple runs to reproduce, ~66% failure rate):
   ```bash
   mvn -pl :accumulo-test failsafe:integration-test \
       -Dit.test=DataLossAfterRestartIT#testDataLossAfterRestart
   ```
   
   **Expected behavior**
   After `BatchWriter.flush()` returns with the default `SYNC` durability, the 
data should be persisted to the WAL and should survive a tablet server restart. 
When the tablet is reassigned and WAL recovery completes, all flushed data 
should be visible to subsequent scans.
   
   **Screenshots**
   Debug output showing data loss:
   ```
   DATA LOSS DETECTED: Row 'b' not found after restart. Full table scan found 1 
entries: a
   ```
   
   This shows that row 'a' (written earlier, likely minor-compacted during 
merge) survives but row 'b' (written and flushed just before restart, only in 
WAL) is lost.
   
   **Additional context**
   - The failure is intermittent (~2 out of 3 runs fail), suggesting 
timing-dependent behavior
   - The default durability is `SYNC` which should guarantee data persistence 
before `flush()` returns
   - The `killProcess()` method uses `stopProcessWithTimeout(process, 30, 
TimeUnit.SECONDS)` which calls `process.destroy()` (SIGTERM)
   - Without a shutdown hook, the tablet server doesn't properly unload tablets 
before the JVM exits
   
   **Proposed Fix**
   
   Add a JVM shutdown hook in `AbstractServer.runServer()` that:
   1. Sets `shutdownRequested` to true when SIGTERM is received
   2. Waits for `shutdownComplete` to be set before allowing the JVM to exit
   
   See patch file: `patches/BUG-GROUP-3.patch`
   
   **Fix Verification**
   
   After applying the patch, the test was run 10 times and passed all 10 runs:
   - Before fix: ~33% pass rate (1 out of 3 runs)
   - After fix: 100% pass rate (10 out of 10 runs)
   
   The shutdown hook ensures that when SIGTERM is received:
   1. `shutdownRequested` is set to true, signaling the server to begin 
graceful shutdown
   2. The server thread is interrupted to wake it from any wait/sleep
   3. The hook waits up to 30 seconds for `shutdownComplete` to be set, 
allowing tablets to be properly unloaded
   
   Stack trace:
   ```
   java.util.NoSuchElementException
       at 
org.apache.accumulo.core.clientImpl.ScannerIterator.next(ScannerIterator.java:123)
       at 
org.apache.accumulo.core.clientImpl.ScannerIterator.next(ScannerIterator.java:46)
       at 
org.apache.accumulo.test.functional.LogicalTimeIT.runMergeTest(LogicalTimeIT.java:126)
       at 
org.apache.accumulo.test.functional.LogicalTimeIT.run(LogicalTimeIT.java:52)
   ```
   
   **Proposed Fix:**
   ```java
   --- 
a/server/base/src/main/java/org/apache/accumulo/server/AbstractServer.java
   +++ 
b/server/base/src/main/java/org/apache/accumulo/server/AbstractServer.java
   @@ -21,6 +21,7 @@ package org.apache.accumulo.server;
    import java.util.OptionalInt;
    import java.util.concurrent.TimeUnit;
    import java.util.concurrent.atomic.AtomicBoolean;
   +import java.util.concurrent.atomic.AtomicLong;
    import java.util.concurrent.atomic.AtomicReference;
   
    import org.apache.accumulo.core.Constants;
   @@ -59,6 +60,9 @@ public abstract class AbstractServer
      protected final long idleReportingPeriodNanos;
      private volatile long idlePeriodStartNanos = 0L;
      private volatile Thread serverThread;
   +  private volatile Thread shutdownHookThread;
   +  private static final long SHUTDOWN_WAIT_TIMEOUT_MS = 30000; // 30 seconds 
max wait
   +  private static final Logger LOG = 
LoggerFactory.getLogger(AbstractServer.class);
      private volatile Thread verificationThread;
      private final AtomicBoolean shutdownRequested = new AtomicBoolean(false);
      private final AtomicBoolean shutdownComplete = new AtomicBoolean(false);
   @@ -215,10 +219,47 @@ public abstract class AbstractServer
       *   ServiceLock.unlock(serverLock);
       * }
       * </pre>
   +   *
   +   * This method also registers a JVM shutdown hook that triggers graceful 
shutdown
   +   * when SIGTERM is received, ensuring data is properly flushed before the 
process exits.
       */
      public void runServer() throws Exception {
        final AtomicReference<Throwable> err = new AtomicReference<>();
        serverThread = new Thread(TraceUtil.wrap(this), applicationName);
   +
   +    // Register shutdown hook to handle SIGTERM gracefully
   +    shutdownHookThread = new Thread(() -> {
   +      LOG.info("Shutdown hook triggered, initiating graceful shutdown for 
{}", applicationName);
   +
   +      // Signal that shutdown is requested
   +      if (shutdownRequested.compareAndSet(false, true)) {
   +        LOG.info("Graceful shutdown initiated via shutdown hook.");
   +
   +        // Interrupt the server thread to wake it from any wait/sleep
   +        if (serverThread != null && serverThread.isAlive()) {
   +          serverThread.interrupt();
   +        }
   +
   +        // Wait for the server to complete shutdown (with timeout)
   +        long startTime = System.currentTimeMillis();
   +        while (!shutdownComplete.get()) {
   +          long elapsed = System.currentTimeMillis() - startTime;
   +          if (elapsed > SHUTDOWN_WAIT_TIMEOUT_MS) {
   +            LOG.warn("Shutdown hook timed out after {}ms waiting for {} to 
complete shutdown",
   +                elapsed, applicationName);
   +            break;
   +          }
   +          try {
   +            Thread.sleep(100);
   +          } catch (InterruptedException e) {
   +            LOG.warn("Shutdown hook interrupted while waiting for graceful 
shutdown");
   +            break;
   +          }
   +        }
   +        LOG.info("Shutdown hook completed for {}, shutdownComplete={}", 
applicationName, shutdownComplete.get());
   +      }
   +    }, applicationName + "-shutdown-hook");
   +    Runtime.getRuntime().addShutdownHook(shutdownHookThread);
   +
        serverThread.setUncaughtExceptionHandler((thread, exception) -> 
err.set(exception));
        serverThread.start();
        serverThread.join();
   @@ -226,6 +267,15 @@ public abstract class AbstractServer
          verificationThread.interrupt();
          verificationThread.join();
        }
   +
   +    // Try to remove the shutdown hook if we're exiting normally (not via 
SIGTERM)
   +    // This prevents the hook from running when the server exits cleanly
   +    try {
   +      Runtime.getRuntime().removeShutdownHook(shutdownHookThread);
   +    } catch (IllegalStateException e) {
   +      // Shutdown is already in progress, hook is already running or about 
to run
   +    }
   +
        log.info(getClass().getSimpleName() + " process shut down.");
        Throwable thrown = err.get();
        if (thrown != null) {
   
   ```


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to