teamconfx opened a new issue, #6046:
URL: https://github.com/apache/accumulo/issues/6046
**Describe the bug**
Flushed mutations are intermittently lost after a tablet server restart.
Data that was successfully acknowledged by `BatchWriter.flush()` may not be
visible after the tablet server restarts and the tablet is reassigned. This
violates Accumulo's durability guarantees when using the default `SYNC`
durability setting.
**Versions (OS, Maven, Java, and others, as appropriate):**
- Affected version(s) of this project: 2.1.4
- OS: Linux (Ubuntu)
- Java: OpenJDK 17
- Maven: 3.x
**Root Cause Analysis**
The root cause is a **missing JVM shutdown hook** in `AbstractServer` that
would trigger graceful shutdown when SIGTERM is received.
**The Problem:**
1. When SIGTERM is sent to the tablet server process (via
`Process.destroy()`), the JVM starts its shutdown sequence
2. `AbstractServer.gracefulShutdown()` sets `shutdownRequested` to true, but
this method is only called via RPC or when an `InterruptedException` is caught
- **NOT when SIGTERM is received**
3. Without a shutdown hook, `shutdownRequested` remains false during
SIGTERM-initiated shutdown
4. The main server loop in `TabletServer.run()` doesn't know it should
perform graceful shutdown
5. The JVM terminates before tablets are properly unloaded and data is
flushed to files
6. Data that was only in memory/WAL (not yet minor-compacted) is lost
**Real-World Production Scenarios:**
This bug can occur in production when tablet servers are stopped using
standard OS/container process management instead of Accumulo's admin tools:
1. **Kubernetes/Container Orchestration**: When a pod is terminated during
rolling deployments, node draining, resource pressure, or pod eviction,
Kubernetes sends SIGTERM directly to the process. K8s has no knowledge of
Accumulo's RPC-based shutdown mechanism.
2. **Docker**: Running `docker stop <container>` sends SIGTERM to the
container's main process.
3. **Systemd Service Management**: Running `systemctl stop accumulo-tserver`
sends SIGTERM to the service process.
4. **Cloud Provider VM Lifecycle Events**: AWS Spot Instance interruptions,
GCP Preemptible VM shutdowns, and Azure Spot VM evictions all send SIGTERM to
processes before terminating the VM. These events can happen at any time with
only seconds of warning.
5. **Manual Process Termination**: An operator running `kill <pid>` sends
SIGTERM by default.
In all these cases, Accumulo's `gracefulShutdown()` RPC method is never
called, so the tablet server exits without properly unloading tablets. This is
particularly dangerous in cloud environments where VM preemption is common and
expected.
The Accumulo admin tools (`accumulo admin stop`) use RPC to call
`gracefulShutdown()`, but external process managers and orchestration systems
cannot use this mechanism.
**Key Code Locations:**
1.
`server/base/src/main/java/org/apache/accumulo/server/AbstractServer.java`:
- Lines 162-184: `gracefulShutdown()` sets `shutdownRequested` but is not
called on SIGTERM
- Lines 63-64: `shutdownRequested` and `shutdownComplete` AtomicBooleans
exist but aren't used in a shutdown hook
- Lines 218-239: `runServer()` waits for the server thread but doesn't
register a shutdown hook
2.
`server/tserver/src/main/java/org/apache/accumulo/tserver/TabletServer.java`:
- Lines 953-974: Graceful shutdown logic that waits for tablets to be
unloaded - only runs if `shutdownRequested` is true
**To Reproduce**
Add the following test to
`test/src/main/java/org/apache/accumulo/test/functional/`:
```java
package org.apache.accumulo.test.functional;
import java.time.Duration;
import java.util.ArrayList;
import java.util.List;
import java.util.TreeSet;
import org.apache.accumulo.core.client.Accumulo;
import org.apache.accumulo.core.client.AccumuloClient;
import org.apache.accumulo.core.client.BatchWriter;
import org.apache.accumulo.core.client.Scanner;
import org.apache.accumulo.core.client.admin.NewTableConfiguration;
import org.apache.accumulo.core.client.admin.TimeType;
import org.apache.accumulo.core.data.Mutation;
import org.apache.accumulo.core.data.Range;
import org.apache.accumulo.core.security.Authorizations;
import org.apache.accumulo.harness.AccumuloClusterHarness;
import org.apache.accumulo.minicluster.ServerType;
import org.apache.accumulo.miniclusterImpl.MiniAccumuloClusterImpl;
import org.apache.accumulo.miniclusterImpl.ProcessReference;
import org.apache.hadoop.io.Text;
import org.junit.jupiter.api.Test;
import static org.junit.jupiter.api.Assertions.assertTrue;
import static org.junit.jupiter.api.Assertions.fail;
public class DataLossAfterRestartIT extends AccumuloClusterHarness {
@Override
protected Duration defaultTimeout() {
return Duration.ofMinutes(5);
}
@Test
public void testDataLossAfterRestart() throws Exception {
String tableName = getUniqueNames(1)[0];
try (AccumuloClient client =
Accumulo.newClient().from(getClientProps()).build()) {
// Create table with logical time and a split
TreeSet<Text> splits = new TreeSet<>();
splits.add(new Text("m"));
client.tableOperations().create(tableName,
new
NewTableConfiguration().setTimeType(TimeType.LOGICAL).withSplits(splits));
BatchWriter bw = client.createBatchWriter(tableName);
// Write first mutation (row 'a')
Mutation m1 = new Mutation("a");
m1.put("cf", "cq", "v");
bw.addMutation(m1);
bw.flush();
// Perform merge
client.tableOperations().merge(tableName, null, null);
// Write second mutation (row 'b')
Mutation m2 = new Mutation("b");
m2.put("cf", "cq", "v");
bw.addMutation(m2);
bw.flush();
// At this point, both row 'a' and row 'b' should be persisted (SYNC
durability)
// restart a tablet server using killProcess and start
MiniAccumuloClusterImpl cluster = (MiniAccumuloClusterImpl)
getCluster();
List<ProcessReference> tservers = new ArrayList<>(
cluster.getProcesses().get(ServerType.TABLET_SERVER));
if (!tservers.isEmpty()) {
ProcessReference tserver = tservers.get(0);
// Kill the tablet server (sends SIGTERM, waits up to 30 seconds)
cluster.killProcess(ServerType.TABLET_SERVER, tserver);
// Start a replacement tablet server
cluster.getClusterControl().start(ServerType.TABLET_SERVER);
// Wait for cluster to stabilize
client.instanceOperations().waitForBalance();
}
bw.close();
// Now scan for row 'b' - this should succeed but intermittently fails
try (Scanner scanner = client.createScanner(tableName,
Authorizations.EMPTY)) {
scanner.setRange(new Range("b"));
if (!scanner.iterator().hasNext()) {
// Check what data exists
try (Scanner fullScan = client.createScanner(tableName,
Authorizations.EMPTY)) {
fullScan.setRange(new Range());
StringBuilder sb = new StringBuilder();
int count = 0;
for (var entry : fullScan) {
sb.append(entry.getKey().getRow()).append(" ");
count++;
}
fail("DATA LOSS DETECTED: Row 'b' not found after restart. " +
"Full table scan found " + count + " entries: " +
sb.toString());
}
}
// Verify the data
long time = scanner.iterator().next().getKey().getTimestamp();
assertTrue(time > 0, "Expected positive timestamp");
}
}
}
}
```
Run the test (may need multiple runs to reproduce, ~66% failure rate):
```bash
mvn -pl :accumulo-test failsafe:integration-test \
-Dit.test=DataLossAfterRestartIT#testDataLossAfterRestart
```
**Expected behavior**
After `BatchWriter.flush()` returns with the default `SYNC` durability, the
data should be persisted to the WAL and should survive a tablet server restart.
When the tablet is reassigned and WAL recovery completes, all flushed data
should be visible to subsequent scans.
**Screenshots**
Debug output showing data loss:
```
DATA LOSS DETECTED: Row 'b' not found after restart. Full table scan found 1
entries: a
```
This shows that row 'a' (written earlier, likely minor-compacted during
merge) survives but row 'b' (written and flushed just before restart, only in
WAL) is lost.
**Additional context**
- The failure is intermittent (~2 out of 3 runs fail), suggesting
timing-dependent behavior
- The default durability is `SYNC` which should guarantee data persistence
before `flush()` returns
- The `killProcess()` method uses `stopProcessWithTimeout(process, 30,
TimeUnit.SECONDS)` which calls `process.destroy()` (SIGTERM)
- Without a shutdown hook, the tablet server doesn't properly unload tablets
before the JVM exits
**Proposed Fix**
Add a JVM shutdown hook in `AbstractServer.runServer()` that:
1. Sets `shutdownRequested` to true when SIGTERM is received
2. Waits for `shutdownComplete` to be set before allowing the JVM to exit
See patch file: `patches/BUG-GROUP-3.patch`
**Fix Verification**
After applying the patch, the test was run 10 times and passed all 10 runs:
- Before fix: ~33% pass rate (1 out of 3 runs)
- After fix: 100% pass rate (10 out of 10 runs)
The shutdown hook ensures that when SIGTERM is received:
1. `shutdownRequested` is set to true, signaling the server to begin
graceful shutdown
2. The server thread is interrupted to wake it from any wait/sleep
3. The hook waits up to 30 seconds for `shutdownComplete` to be set,
allowing tablets to be properly unloaded
Stack trace:
```
java.util.NoSuchElementException
at
org.apache.accumulo.core.clientImpl.ScannerIterator.next(ScannerIterator.java:123)
at
org.apache.accumulo.core.clientImpl.ScannerIterator.next(ScannerIterator.java:46)
at
org.apache.accumulo.test.functional.LogicalTimeIT.runMergeTest(LogicalTimeIT.java:126)
at
org.apache.accumulo.test.functional.LogicalTimeIT.run(LogicalTimeIT.java:52)
```
**Proposed Fix:**
```java
---
a/server/base/src/main/java/org/apache/accumulo/server/AbstractServer.java
+++
b/server/base/src/main/java/org/apache/accumulo/server/AbstractServer.java
@@ -21,6 +21,7 @@ package org.apache.accumulo.server;
import java.util.OptionalInt;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.concurrent.atomic.AtomicLong;
import java.util.concurrent.atomic.AtomicReference;
import org.apache.accumulo.core.Constants;
@@ -59,6 +60,9 @@ public abstract class AbstractServer
protected final long idleReportingPeriodNanos;
private volatile long idlePeriodStartNanos = 0L;
private volatile Thread serverThread;
+ private volatile Thread shutdownHookThread;
+ private static final long SHUTDOWN_WAIT_TIMEOUT_MS = 30000; // 30 seconds
max wait
+ private static final Logger LOG =
LoggerFactory.getLogger(AbstractServer.class);
private volatile Thread verificationThread;
private final AtomicBoolean shutdownRequested = new AtomicBoolean(false);
private final AtomicBoolean shutdownComplete = new AtomicBoolean(false);
@@ -215,10 +219,47 @@ public abstract class AbstractServer
* ServiceLock.unlock(serverLock);
* }
* </pre>
+ *
+ * This method also registers a JVM shutdown hook that triggers graceful
shutdown
+ * when SIGTERM is received, ensuring data is properly flushed before the
process exits.
*/
public void runServer() throws Exception {
final AtomicReference<Throwable> err = new AtomicReference<>();
serverThread = new Thread(TraceUtil.wrap(this), applicationName);
+
+ // Register shutdown hook to handle SIGTERM gracefully
+ shutdownHookThread = new Thread(() -> {
+ LOG.info("Shutdown hook triggered, initiating graceful shutdown for
{}", applicationName);
+
+ // Signal that shutdown is requested
+ if (shutdownRequested.compareAndSet(false, true)) {
+ LOG.info("Graceful shutdown initiated via shutdown hook.");
+
+ // Interrupt the server thread to wake it from any wait/sleep
+ if (serverThread != null && serverThread.isAlive()) {
+ serverThread.interrupt();
+ }
+
+ // Wait for the server to complete shutdown (with timeout)
+ long startTime = System.currentTimeMillis();
+ while (!shutdownComplete.get()) {
+ long elapsed = System.currentTimeMillis() - startTime;
+ if (elapsed > SHUTDOWN_WAIT_TIMEOUT_MS) {
+ LOG.warn("Shutdown hook timed out after {}ms waiting for {} to
complete shutdown",
+ elapsed, applicationName);
+ break;
+ }
+ try {
+ Thread.sleep(100);
+ } catch (InterruptedException e) {
+ LOG.warn("Shutdown hook interrupted while waiting for graceful
shutdown");
+ break;
+ }
+ }
+ LOG.info("Shutdown hook completed for {}, shutdownComplete={}",
applicationName, shutdownComplete.get());
+ }
+ }, applicationName + "-shutdown-hook");
+ Runtime.getRuntime().addShutdownHook(shutdownHookThread);
+
serverThread.setUncaughtExceptionHandler((thread, exception) ->
err.set(exception));
serverThread.start();
serverThread.join();
@@ -226,6 +267,15 @@ public abstract class AbstractServer
verificationThread.interrupt();
verificationThread.join();
}
+
+ // Try to remove the shutdown hook if we're exiting normally (not via
SIGTERM)
+ // This prevents the hook from running when the server exits cleanly
+ try {
+ Runtime.getRuntime().removeShutdownHook(shutdownHookThread);
+ } catch (IllegalStateException e) {
+ // Shutdown is already in progress, hook is already running or about
to run
+ }
+
log.info(getClass().getSimpleName() + " process shut down.");
Throwable thrown = err.get();
if (thrown != null) {
```
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]