This is an automated email from the ASF dual-hosted git repository.

kturner pushed a commit to branch elasticity
in repository https://gitbox.apache.org/repos/asf/accumulo.git

commit ae49a7ff027178a387d39f048e1090bcffe11162
Merge: 8384b9be6a ae6085c346
Author: Keith Turner <ktur...@apache.org>
AuthorDate: Wed Mar 13 15:12:41 2024 -0400

    Merge branch 'main' into elasticity

 .../accumulo/core/fate/AbstractFateStore.java      |  24 +--
 .../org/apache/accumulo/core/fate/AdminUtil.java   |   6 +-
 .../java/org/apache/accumulo/core/fate/Fate.java   |  15 +-
 .../org/apache/accumulo/core/fate/FateCleaner.java |   3 +-
 .../org/apache/accumulo/core/fate/FateStore.java   |   6 +-
 .../accumulo/core/fate/WrappedFateTxStore.java     |   6 +-
 .../accumulo/core/file/rfile/GenerateSplits.java   |  32 +++-
 .../apache/accumulo/core/util/time/NanoTime.java   | 104 +++++++++++++
 .../apache/accumulo/core/fate/FateCleanerTest.java |  21 ++-
 .../org/apache/accumulo/core/fate/TestStore.java   |   4 +-
 .../core/file/rfile/GenerateSplitsTest.java        |  64 +++++++-
 .../accumulo/core/util/time/NanoTimeTest.java      | 162 +++++++++++++++++++++
 .../java/org/apache/accumulo/manager/Manager.java  |   9 +-
 .../test/compaction/ExternalCompaction_1_IT.java   |   6 +-
 .../accumulo/test/fate/accumulo/FateStoreIT.java   |  23 ++-
 15 files changed, 410 insertions(+), 75 deletions(-)

diff --cc 
core/src/main/java/org/apache/accumulo/core/fate/AbstractFateStore.java
index 702313f5ab,0000000000..0cad25f857
mode 100644,000000..100644
--- a/core/src/main/java/org/apache/accumulo/core/fate/AbstractFateStore.java
+++ b/core/src/main/java/org/apache/accumulo/core/fate/AbstractFateStore.java
@@@ -1,494 -1,0 +1,494 @@@
 +/*
 + * Licensed to the Apache Software Foundation (ASF) under one
 + * or more contributor license agreements.  See the NOTICE file
 + * distributed with this work for additional information
 + * regarding copyright ownership.  The ASF licenses this file
 + * to you under the Apache License, Version 2.0 (the
 + * "License"); you may not use this file except in compliance
 + * with the License.  You may obtain a copy of the License at
 + *
 + *   https://www.apache.org/licenses/LICENSE-2.0
 + *
 + * Unless required by applicable law or agreed to in writing,
 + * software distributed under the License is distributed on an
 + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
 + * KIND, either express or implied.  See the License for the
 + * specific language governing permissions and limitations
 + * under the License.
 + */
 +package org.apache.accumulo.core.fate;
 +
 +import static java.nio.charset.StandardCharsets.UTF_8;
 +
 +import java.io.ByteArrayInputStream;
 +import java.io.ByteArrayOutputStream;
 +import java.io.IOException;
 +import java.io.ObjectInputStream;
 +import java.io.ObjectOutputStream;
 +import java.io.Serializable;
 +import java.io.UncheckedIOException;
++import java.time.Duration;
 +import java.util.EnumSet;
 +import java.util.HashMap;
 +import java.util.HashSet;
 +import java.util.Map;
 +import java.util.Objects;
 +import java.util.Optional;
 +import java.util.Set;
- import java.util.concurrent.TimeUnit;
 +import java.util.concurrent.atomic.AtomicBoolean;
 +import java.util.concurrent.atomic.AtomicLong;
 +import java.util.function.Consumer;
 +import java.util.stream.Stream;
 +
 +import org.apache.accumulo.core.fate.Fate.TxInfo;
 +import org.apache.accumulo.core.util.Pair;
++import org.apache.accumulo.core.util.time.NanoTime;
 +import org.slf4j.Logger;
 +import org.slf4j.LoggerFactory;
 +
 +import com.google.common.base.Preconditions;
 +import com.google.common.hash.HashCode;
 +import com.google.common.hash.Hashing;
 +
 +import edu.umd.cs.findbugs.annotations.SuppressFBWarnings;
 +
 +public abstract class AbstractFateStore<T> implements FateStore<T> {
 +
 +  private static final Logger log = 
LoggerFactory.getLogger(AbstractFateStore.class);
 +
 +  // Default maximum size of 100,000 transactions before deferral is stopped 
and
 +  // all existing transactions are processed immediately again
 +  public static final int DEFAULT_MAX_DEFERRED = 100_000;
 +
 +  public static final FateIdGenerator DEFAULT_FATE_ID_GENERATOR = new 
FateIdGenerator() {
 +    @Override
 +    public FateId fromTypeAndKey(FateInstanceType instanceType, FateKey 
fateKey) {
 +      HashCode hashCode = 
Hashing.murmur3_128().hashBytes(fateKey.getSerialized());
 +      long tid = hashCode.asLong() & 0x7fffffffffffffffL;
 +      return FateId.from(instanceType, tid);
 +    }
 +  };
 +
 +  protected final Set<FateId> reserved;
-   protected final Map<FateId,Long> deferred;
++  protected final Map<FateId,NanoTime> deferred;
 +  private final int maxDeferred;
 +  private final AtomicBoolean deferredOverflow = new AtomicBoolean();
 +  private final FateIdGenerator fateIdGenerator;
 +
 +  // This is incremented each time a transaction was unreserved that was non 
new
 +  protected final SignalCount unreservedNonNewCount = new SignalCount();
 +
 +  // This is incremented each time a transaction is unreserved that was 
runnable
 +  protected final SignalCount unreservedRunnableCount = new SignalCount();
 +
 +  public AbstractFateStore() {
 +    this(DEFAULT_MAX_DEFERRED, DEFAULT_FATE_ID_GENERATOR);
 +  }
 +
 +  public AbstractFateStore(int maxDeferred, FateIdGenerator fateIdGenerator) {
 +    this.maxDeferred = maxDeferred;
 +    this.fateIdGenerator = Objects.requireNonNull(fateIdGenerator);
 +    this.reserved = new HashSet<>();
 +    this.deferred = new HashMap<>();
 +  }
 +
 +  public static byte[] serialize(Object o) {
 +    try (ByteArrayOutputStream baos = new ByteArrayOutputStream();
 +        ObjectOutputStream oos = new ObjectOutputStream(baos)) {
 +      oos.writeObject(o);
 +      return baos.toByteArray();
 +    } catch (IOException e) {
 +      throw new UncheckedIOException(e);
 +    }
 +  }
 +
 +  @SuppressFBWarnings(value = "OBJECT_DESERIALIZATION",
 +      justification = "unsafe to store arbitrary serialized objects like 
this, but needed for now"
 +          + " for backwards compatibility")
 +  public static Object deserialize(byte[] ser) {
 +    try (ByteArrayInputStream bais = new ByteArrayInputStream(ser);
 +        ObjectInputStream ois = new ObjectInputStream(bais)) {
 +      return ois.readObject();
 +    } catch (IOException e) {
 +      throw new UncheckedIOException(e);
 +    } catch (ReflectiveOperationException e) {
 +      throw new IllegalStateException(e);
 +    }
 +  }
 +
 +  /**
 +   * Attempt to reserve the fate transaction.
 +   *
 +   * @param fateId The FateId
 +   * @return An Optional containing the FateTxStore if the transaction was 
successfully reserved, or
 +   *         an empty Optional if the transaction was already reserved.
 +   */
 +  @Override
 +  public Optional<FateTxStore<T>> tryReserve(FateId fateId) {
 +    synchronized (this) {
 +      if (!reserved.contains(fateId)) {
 +        return Optional.of(reserve(fateId));
 +      }
 +      return Optional.empty();
 +    }
 +  }
 +
 +  @Override
 +  public FateTxStore<T> reserve(FateId fateId) {
 +    synchronized (AbstractFateStore.this) {
 +      while (reserved.contains(fateId)) {
 +        try {
 +          AbstractFateStore.this.wait(100);
 +        } catch (InterruptedException e) {
 +          Thread.currentThread().interrupt();
 +          throw new IllegalStateException(e);
 +        }
 +      }
 +
 +      reserved.add(fateId);
 +      return newFateTxStore(fateId, true);
 +    }
 +  }
 +
 +  @Override
 +  public void runnable(AtomicBoolean keepWaiting, Consumer<FateId> 
idConsumer) {
 +
 +    AtomicLong seen = new AtomicLong(0);
 +
 +    while (keepWaiting.get() && seen.get() == 0) {
 +      final long beforeCount = unreservedRunnableCount.getCount();
 +      final boolean beforeDeferredOverflow = deferredOverflow.get();
 +
 +      try (Stream<FateIdStatus> transactions = getTransactions()) {
 +        transactions.filter(fateIdStatus -> 
isRunnable(fateIdStatus.getStatus()))
 +            .map(FateIdStatus::getFateId).filter(fateId -> {
 +              synchronized (AbstractFateStore.this) {
 +                var deferredTime = deferred.get(fateId);
 +                if (deferredTime != null) {
-                   if ((deferredTime - System.nanoTime()) >= 0) {
++                  if (deferredTime.elapsed().isNegative()) {
++                    // negative elapsed time indicates the deferal time is in 
the future
 +                    return false;
 +                  } else {
 +                    deferred.remove(fateId);
 +                  }
 +                }
 +                return !reserved.contains(fateId);
 +              }
 +            }).forEach(fateId -> {
 +              seen.incrementAndGet();
 +              idConsumer.accept(fateId);
 +            });
 +      }
 +
 +      // If deferredOverflow was previously marked true then the deferred map
 +      // would have been cleared and seen.get() should be greater than 0 as 
there would
 +      // be a lot of transactions to process in the previous run, so we won't 
be sleeping here
 +      if (seen.get() == 0) {
 +        if (beforeCount == unreservedRunnableCount.getCount()) {
 +          long waitTime = 5000;
 +          synchronized (AbstractFateStore.this) {
 +            if (!deferred.isEmpty()) {
-               long currTime = System.nanoTime();
-               long minWait =
-                   deferred.values().stream().mapToLong(l -> l - 
currTime).min().getAsLong();
-               waitTime = TimeUnit.MILLISECONDS.convert(minWait, 
TimeUnit.NANOSECONDS);
++              var now = NanoTime.now();
++              waitTime = deferred.values().stream()
++                  .mapToLong(nanoTime -> 
nanoTime.subtract(now).toMillis()).min().getAsLong();
 +            }
 +          }
 +
 +          if (waitTime > 0) {
 +            unreservedRunnableCount.waitFor(count -> count != beforeCount, 
waitTime,
 +                keepWaiting::get);
 +          }
 +        }
 +      }
 +
 +      // Reset if the current state only if it matches the state before the 
execution.
 +      // This is to avoid a race condition where the flag was set during the 
run.
 +      // We should ensure at least one of the FATE executors will run through 
the
 +      // entire transaction list first before clearing the flag and allowing 
more
 +      // deferred entries into the map again. In other words, if the before 
state
 +      // was false and during the execution at some point it was marked true 
this would
 +      // not reset until after the next run
 +      deferredOverflow.compareAndSet(beforeDeferredOverflow, false);
 +    }
 +  }
 +
 +  @Override
 +  public Stream<FateIdStatus> list() {
 +    return getTransactions();
 +  }
 +
 +  @Override
 +  public ReadOnlyFateTxStore<T> read(FateId fateId) {
 +    return newFateTxStore(fateId, false);
 +  }
 +
 +  protected boolean isRunnable(TStatus status) {
 +    return status == TStatus.IN_PROGRESS || status == 
TStatus.FAILED_IN_PROGRESS
 +        || status == TStatus.SUBMITTED;
 +  }
 +
 +  public static abstract class FateIdStatusBase implements FateIdStatus {
 +    private final FateId fateId;
 +
 +    public FateIdStatusBase(FateId fateId) {
 +      this.fateId = fateId;
 +    }
 +
 +    @Override
 +    public FateId getFateId() {
 +      return fateId;
 +    }
 +  }
 +
 +  @Override
 +  public boolean isDeferredOverflow() {
 +    return deferredOverflow.get();
 +  }
 +
 +  @Override
 +  public int getDeferredCount() {
 +    // This method is primarily used right now for unit testing but
 +    // if this synchronization becomes an issue we could add an atomic
 +    // counter instead to track it separately so we don't need to lock
 +    synchronized (AbstractFateStore.this) {
 +      return deferred.size();
 +    }
 +  }
 +
 +  private Optional<FateId> create(FateKey fateKey) {
 +    FateId fateId = fateIdGenerator.fromTypeAndKey(getInstanceType(), 
fateKey);
 +
 +    try {
 +      create(fateId, fateKey);
 +    } catch (IllegalStateException e) {
 +      Pair<TStatus,Optional<FateKey>> statusAndKey = getStatusAndKey(fateId);
 +      TStatus status = statusAndKey.getFirst();
 +      Optional<FateKey> tFateKey = statusAndKey.getSecond();
 +
 +      // Case 1: Status is NEW so this is unseeded, we can return and allow 
the calling code
 +      // to reserve/seed as long as the existing key is the same and not 
different as that would
 +      // mean a collision
 +      if (status == TStatus.NEW) {
 +        Preconditions.checkState(tFateKey.isPresent(), "Tx Key is missing 
from tid %s",
 +            fateId.getTid());
 +        Preconditions.checkState(fateKey.equals(tFateKey.orElseThrow()),
 +            "Collision detected for tid %s", fateId.getTid());
 +        // Case 2: Status is some other state which means already in progress
 +        // so we can just log and return empty optional
 +      } else {
 +        log.trace("Existing transaction {} already exists for key {} with 
status {}", fateId,
 +            fateKey, status);
 +        return Optional.empty();
 +      }
 +    }
 +
 +    return Optional.of(fateId);
 +  }
 +
 +  @Override
 +  public Optional<FateTxStore<T>> createAndReserve(FateKey fateKey) {
 +    FateId fateId = fateIdGenerator.fromTypeAndKey(getInstanceType(), 
fateKey);
 +    final Optional<FateTxStore<T>> txStore;
 +
 +    // First make sure we can reserve in memory the fateId, if not
 +    // we can return an empty Optional as it is reserved and in progress
 +    // This reverses the usual order of creation and then reservation but
 +    // this prevents a race condition by ensuring we can reserve first.
 +    // This will create the FateTxStore before creation but this object
 +    // is not exposed until after creation is finished so there should not
 +    // be any errors.
 +    final Optional<FateTxStore<T>> reservedTxStore;
 +    synchronized (this) {
 +      reservedTxStore = tryReserve(fateId);
 +    }
 +
 +    // If present we were able to reserve so try and create
 +    if (reservedTxStore.isPresent()) {
 +      try {
 +        var fateIdFromCreate = create(fateKey);
 +        if (fateIdFromCreate.isPresent()) {
 +          
Preconditions.checkState(fateId.equals(fateIdFromCreate.orElseThrow()),
 +              "Transaction creation returned unexpected %s, expected %s", 
fateIdFromCreate, fateId);
 +          txStore = reservedTxStore;
 +        } else {
 +          // We already exist in a non-new state then un-reserve and an empty
 +          // Optional will be returned. This is expected to happen when the
 +          // system is busy and operations are not running, and we keep 
seeding them
 +          synchronized (this) {
 +            reserved.remove(fateId);
 +          }
 +          txStore = Optional.empty();
 +        }
 +      } catch (Exception e) {
 +        // Clean up the reservation if the creation failed
 +        // And then throw error
 +        synchronized (this) {
 +          reserved.remove(fateId);
 +        }
 +        if (e instanceof IllegalStateException) {
 +          throw e;
 +        } else {
 +          throw new IllegalStateException(e);
 +        }
 +      }
 +    } else {
 +      // Could not reserve so return empty
 +      log.trace("Another thread currently has transaction {} key {} 
reserved", fateId, fateKey);
 +      txStore = Optional.empty();
 +    }
 +
 +    return txStore;
 +  }
 +
 +  protected abstract void create(FateId fateId, FateKey fateKey);
 +
 +  protected abstract Pair<TStatus,Optional<FateKey>> getStatusAndKey(FateId 
fateId);
 +
 +  protected abstract Stream<FateIdStatus> getTransactions();
 +
 +  protected abstract TStatus _getStatus(FateId fateId);
 +
 +  protected abstract Optional<FateKey> getKey(FateId fateId);
 +
 +  protected abstract FateTxStore<T> newFateTxStore(FateId fateId, boolean 
isReserved);
 +
 +  protected abstract FateInstanceType getInstanceType();
 +
 +  protected abstract class AbstractFateTxStoreImpl<T> implements 
FateTxStore<T> {
 +    protected final FateId fateId;
 +    protected final boolean isReserved;
 +
 +    protected TStatus observedStatus = null;
 +
 +    protected AbstractFateTxStoreImpl(FateId fateId, boolean isReserved) {
 +      this.fateId = fateId;
 +      this.isReserved = isReserved;
 +    }
 +
 +    @Override
 +    public TStatus waitForStatusChange(EnumSet<TStatus> expected) {
 +      Preconditions.checkState(!isReserved,
 +          "Attempted to wait for status change while reserved " + fateId);
 +      while (true) {
 +
 +        long countBefore = unreservedNonNewCount.getCount();
 +
 +        TStatus status = _getStatus(fateId);
 +        if (expected.contains(status)) {
 +          return status;
 +        }
 +
 +        unreservedNonNewCount.waitFor(count -> count != countBefore, 1000, () 
-> true);
 +      }
 +    }
 +
 +    @Override
-     public void unreserve(long deferTime, TimeUnit timeUnit) {
-       deferTime = TimeUnit.NANOSECONDS.convert(deferTime, timeUnit);
++    public void unreserve(Duration deferTime) {
 +
-       if (deferTime < 0) {
++      if (deferTime.isNegative()) {
 +        throw new IllegalArgumentException("deferTime < 0 : " + deferTime);
 +      }
 +
 +      synchronized (AbstractFateStore.this) {
 +        if (!reserved.remove(fateId)) {
 +          throw new IllegalStateException("Tried to unreserve id that was not 
reserved " + fateId);
 +        }
 +
 +        // notify any threads waiting to reserve
 +        AbstractFateStore.this.notifyAll();
 +
 +        // If deferred map has overflowed then skip adding to the deferred map
 +        // and clear the map and set the flag. This will cause the next 
execution
 +        // of runnable to process all the transactions and to not defer as we
 +        // have a large backlog and want to make progress
-         if (deferTime > 0 && !deferredOverflow.get()) {
++        if (deferTime.compareTo(Duration.ZERO) > 0 && 
!deferredOverflow.get()) {
 +          if (deferred.size() >= maxDeferred) {
 +            log.info(
 +                "Deferred map overflowed with size {}, clearing and setting 
deferredOverflow to true",
 +                deferred.size());
 +            deferredOverflow.set(true);
 +            deferred.clear();
 +          } else {
-             deferred.put(fateId, System.nanoTime() + deferTime);
++            deferred.put(fateId, NanoTime.nowPlus(deferTime));
 +          }
 +        }
 +      }
 +
 +      if (observedStatus != null && isRunnable(observedStatus)) {
 +        unreservedRunnableCount.increment();
 +      }
 +
 +      if (observedStatus != TStatus.NEW) {
 +        unreservedNonNewCount.increment();
 +      }
 +    }
 +
 +    protected void verifyReserved(boolean isWrite) {
 +      if (!isReserved && isWrite) {
 +        throw new IllegalStateException("Attempted write on unreserved FATE 
transaction.");
 +      }
 +
 +      if (isReserved) {
 +        synchronized (AbstractFateStore.this) {
 +          if (!reserved.contains(fateId)) {
 +            throw new IllegalStateException("Tried to operate on unreserved 
transaction " + fateId);
 +          }
 +        }
 +      }
 +    }
 +
 +    @Override
 +    public TStatus getStatus() {
 +      verifyReserved(false);
 +      var status = _getStatus(fateId);
 +      observedStatus = status;
 +      return status;
 +    }
 +
 +    @Override
 +    public Optional<FateKey> getKey() {
 +      verifyReserved(false);
 +      return AbstractFateStore.this.getKey(fateId);
 +    }
 +
 +    @Override
 +    public Pair<TStatus,Optional<FateKey>> getStatusAndKey() {
 +      verifyReserved(false);
 +      return AbstractFateStore.this.getStatusAndKey(fateId);
 +    }
 +
 +    @Override
 +    public FateId getID() {
 +      return fateId;
 +    }
 +  }
 +
 +  public interface FateIdGenerator {
 +    FateId fromTypeAndKey(FateInstanceType instanceType, FateKey fateKey);
 +  }
 +
 +  protected byte[] serializeTxInfo(Serializable so) {
 +    if (so instanceof String) {
 +      return ("S " + so).getBytes(UTF_8);
 +    } else {
 +      byte[] sera = serialize(so);
 +      byte[] data = new byte[sera.length + 2];
 +      System.arraycopy(sera, 0, data, 2, sera.length);
 +      data[0] = 'O';
 +      data[1] = ' ';
 +      return data;
 +    }
 +  }
 +
 +  protected Serializable deserializeTxInfo(TxInfo txInfo, byte[] data) {
 +    if (data[0] == 'O') {
 +      byte[] sera = new byte[data.length - 2];
 +      System.arraycopy(data, 2, sera, 0, sera.length);
 +      return (Serializable) deserialize(sera);
 +    } else if (data[0] == 'S') {
 +      return new String(data, 2, data.length - 2, UTF_8);
 +    } else {
 +      throw new IllegalStateException("Bad node data " + txInfo);
 +    }
 +  }
 +}
diff --cc core/src/main/java/org/apache/accumulo/core/fate/AdminUtil.java
index c18defb1ac,7cc0a9c004..f2e6da41e3
--- a/core/src/main/java/org/apache/accumulo/core/fate/AdminUtil.java
+++ b/core/src/main/java/org/apache/accumulo/core/fate/AdminUtil.java
@@@ -32,13 -33,8 +33,12 @@@ import java.util.List
  import java.util.Map;
  import java.util.Map.Entry;
  import java.util.Set;
- import java.util.concurrent.TimeUnit;
 +import java.util.stream.Stream;
  
 -import org.apache.accumulo.core.fate.ReadOnlyTStore.TStatus;
 +import org.apache.accumulo.core.fate.FateStore.FateTxStore;
 +import org.apache.accumulo.core.fate.ReadOnlyFateStore.FateIdStatus;
 +import org.apache.accumulo.core.fate.ReadOnlyFateStore.ReadOnlyFateTxStore;
 +import org.apache.accumulo.core.fate.ReadOnlyFateStore.TStatus;
  import org.apache.accumulo.core.fate.zookeeper.FateLock;
  import org.apache.accumulo.core.fate.zookeeper.FateLock.FateLockPath;
  import org.apache.accumulo.core.fate.zookeeper.ZooReader;
@@@ -458,32 -432,26 +458,32 @@@ public class AdminUtil<T> 
        return false;
      }
      boolean state = false;
 -    zs.reserve(txid);
 -    TStatus ts = zs.getStatus(txid);
 -    switch (ts) {
 -      case UNKNOWN:
 -        System.out.printf("Invalid transaction ID: %016x%n", txid);
 -        break;
 -
 -      case SUBMITTED:
 -      case IN_PROGRESS:
 -      case NEW:
 -      case FAILED:
 -      case FAILED_IN_PROGRESS:
 -      case SUCCESSFUL:
 -        System.out.printf("Deleting transaction: %016x (%s)%n", txid, ts);
 -        zs.delete(txid);
 -        state = true;
 -        break;
 -    }
  
 -    zs.unreserve(txid, Duration.ZERO);
 +    // determine which store to use
 +    FateStore<T> store = stores.get(fateId.getType());
 +
 +    FateTxStore<T> txStore = store.reserve(fateId);
 +    try {
 +      TStatus ts = txStore.getStatus();
 +      switch (ts) {
 +        case UNKNOWN:
 +          System.out.println("Invalid transaction ID: " + fateId);
 +          break;
 +
 +        case SUBMITTED:
 +        case IN_PROGRESS:
 +        case NEW:
 +        case FAILED:
 +        case FAILED_IN_PROGRESS:
 +        case SUCCESSFUL:
 +          System.out.printf("Deleting transaction: %s (%s)%n", fateIdStr, ts);
 +          txStore.delete();
 +          state = true;
 +          break;
 +      }
 +    } finally {
-       txStore.unreserve(0, TimeUnit.MILLISECONDS);
++      txStore.unreserve(Duration.ZERO);
 +    }
      return state;
    }
  
@@@ -501,40 -469,33 +501,40 @@@
        return false;
      }
      boolean state = false;
 -    zs.reserve(txid);
 -    TStatus ts = zs.getStatus(txid);
 -    switch (ts) {
 -      case UNKNOWN:
 -        System.out.printf("Invalid transaction ID: %016x%n", txid);
 -        break;
 -
 -      case SUBMITTED:
 -      case IN_PROGRESS:
 -      case NEW:
 -        System.out.printf("Failing transaction: %016x (%s)%n", txid, ts);
 -        zs.setStatus(txid, TStatus.FAILED_IN_PROGRESS);
 -        state = true;
 -        break;
 -
 -      case SUCCESSFUL:
 -        System.out.printf("Transaction already completed: %016x (%s)%n", 
txid, ts);
 -        break;
 -
 -      case FAILED:
 -      case FAILED_IN_PROGRESS:
 -        System.out.printf("Transaction already failed: %016x (%s)%n", txid, 
ts);
 -        state = true;
 -        break;
 +
 +    // determine which store to use
 +    FateStore<T> store = stores.get(fateId.getType());
 +
 +    FateTxStore<T> txStore = store.reserve(fateId);
 +    try {
 +      TStatus ts = txStore.getStatus();
 +      switch (ts) {
 +        case UNKNOWN:
 +          System.out.println("Invalid fate ID: " + fateId);
 +          break;
 +
 +        case SUBMITTED:
 +        case IN_PROGRESS:
 +        case NEW:
 +          System.out.printf("Failing transaction: %s (%s)%n", fateId, ts);
 +          txStore.setStatus(TStatus.FAILED_IN_PROGRESS);
 +          state = true;
 +          break;
 +
 +        case SUCCESSFUL:
 +          System.out.printf("Transaction already completed: %s (%s)%n", 
fateId, ts);
 +          break;
 +
 +        case FAILED:
 +        case FAILED_IN_PROGRESS:
 +          System.out.printf("Transaction already failed: %s (%s)%n", fateId, 
ts);
 +          state = true;
 +          break;
 +      }
 +    } finally {
-       txStore.unreserve(0, TimeUnit.MILLISECONDS);
++      txStore.unreserve(Duration.ZERO);
      }
  
 -    zs.unreserve(txid, Duration.ZERO);
      return state;
    }
  
diff --cc core/src/main/java/org/apache/accumulo/core/fate/Fate.java
index e5be68dbb2,4fe07bb8b2..f4e9728df3
--- a/core/src/main/java/org/apache/accumulo/core/fate/Fate.java
+++ b/core/src/main/java/org/apache/accumulo/core/fate/Fate.java
@@@ -19,22 -19,20 +19,23 @@@
  package org.apache.accumulo.core.fate;
  
  import static 
com.google.common.util.concurrent.Uninterruptibles.sleepUninterruptibly;
 +import static java.util.concurrent.TimeUnit.MILLISECONDS;
  import static java.util.concurrent.TimeUnit.MINUTES;
  import static java.util.concurrent.TimeUnit.SECONDS;
 -import static org.apache.accumulo.core.fate.ReadOnlyTStore.TStatus.FAILED;
 -import static 
org.apache.accumulo.core.fate.ReadOnlyTStore.TStatus.FAILED_IN_PROGRESS;
 -import static 
org.apache.accumulo.core.fate.ReadOnlyTStore.TStatus.IN_PROGRESS;
 -import static org.apache.accumulo.core.fate.ReadOnlyTStore.TStatus.NEW;
 -import static org.apache.accumulo.core.fate.ReadOnlyTStore.TStatus.SUBMITTED;
 -import static org.apache.accumulo.core.fate.ReadOnlyTStore.TStatus.SUCCESSFUL;
 -import static org.apache.accumulo.core.fate.ReadOnlyTStore.TStatus.UNKNOWN;
 +import static org.apache.accumulo.core.fate.ReadOnlyFateStore.TStatus.FAILED;
 +import static 
org.apache.accumulo.core.fate.ReadOnlyFateStore.TStatus.FAILED_IN_PROGRESS;
 +import static 
org.apache.accumulo.core.fate.ReadOnlyFateStore.TStatus.IN_PROGRESS;
 +import static org.apache.accumulo.core.fate.ReadOnlyFateStore.TStatus.NEW;
 +import static 
org.apache.accumulo.core.fate.ReadOnlyFateStore.TStatus.SUBMITTED;
 +import static 
org.apache.accumulo.core.fate.ReadOnlyFateStore.TStatus.SUCCESSFUL;
 +import static org.apache.accumulo.core.fate.ReadOnlyFateStore.TStatus.UNKNOWN;
  import static org.apache.accumulo.core.util.ShutdownUtil.isIOException;
  
+ import java.time.Duration;
  import java.util.EnumSet;
 +import java.util.Optional;
  import java.util.concurrent.ExecutorService;
 +import java.util.concurrent.LinkedTransferQueue;
  import java.util.concurrent.RejectedExecutionException;
  import java.util.concurrent.ScheduledThreadPoolExecutor;
  import java.util.concurrent.ThreadPoolExecutor;
@@@ -204,8 -132,8 +205,8 @@@ public class Fate<T> 
          } catch (Exception e) {
            runnerLog.error("Uncaught exception in FATE runner thread.", e);
          } finally {
 -          if (tid != null) {
 -            store.unreserve(tid, Duration.ofMillis(deferTime));
 +          if (txStore != null) {
-             txStore.unreserve(deferTime, TimeUnit.MILLISECONDS);
++            txStore.unreserve(Duration.ofMillis(deferTime));
            }
          }
        }
@@@ -337,55 -263,33 +338,55 @@@
      return store.create();
    }
  
 +  public Optional<FateId> seedTransaction(String txName, FateKey fateKey, 
Repo<T> repo,
 +      boolean autoCleanUp, String goalMessage) {
 +
 +    Optional<FateTxStore<T>> optTxStore = store.createAndReserve(fateKey);
 +
 +    return optTxStore.map(txStore -> {
 +      var fateId = txStore.getID();
 +      try {
 +        Preconditions.checkState(txStore.getStatus() == NEW);
 +        seedTransaction(txName, fateId, repo, autoCleanUp, goalMessage, 
txStore);
 +      } finally {
-         txStore.unreserve(0, MILLISECONDS);
++        txStore.unreserve(Duration.ZERO);
 +      }
 +      return fateId;
 +    });
 +  }
 +
 +  private void seedTransaction(String txName, FateId fateId, Repo<T> repo, 
boolean autoCleanUp,
 +      String goalMessage, FateTxStore<T> txStore) {
 +    if (txStore.top() == null) {
 +      try {
 +        log.info("Seeding {} {}", fateId, goalMessage);
 +        txStore.push(repo);
 +      } catch (StackOverflowException e) {
 +        // this should not happen
 +        throw new IllegalStateException(e);
 +      }
 +    }
 +
 +    if (autoCleanUp) {
 +      txStore.setTransactionInfo(TxInfo.AUTO_CLEAN, autoCleanUp);
 +    }
 +
 +    txStore.setTransactionInfo(TxInfo.TX_NAME, txName);
 +
 +    txStore.setStatus(SUBMITTED);
 +  }
 +
    // start work in the transaction.. it is safe to call this
    // multiple times for a transaction... but it will only seed once
 -  public void seedTransaction(String txName, long tid, Repo<T> repo, boolean 
autoCleanUp,
 +  public void seedTransaction(String txName, FateId fateId, Repo<T> repo, 
boolean autoCleanUp,
        String goalMessage) {
 -    store.reserve(tid);
 +    FateTxStore<T> txStore = store.reserve(fateId);
      try {
 -      if (store.getStatus(tid) == NEW) {
 -        if (store.top(tid) == null) {
 -          try {
 -            log.info("Seeding {} {}", FateTxId.formatTid(tid), goalMessage);
 -            store.push(tid, repo);
 -          } catch (StackOverflowException e) {
 -            // this should not happen
 -            throw new IllegalStateException(e);
 -          }
 -        }
 -
 -        if (autoCleanUp) {
 -          store.setTransactionInfo(tid, TxInfo.AUTO_CLEAN, autoCleanUp);
 -        }
 -
 -        store.setTransactionInfo(tid, TxInfo.TX_NAME, txName);
 -
 -        store.setStatus(tid, SUBMITTED);
 +      if (txStore.getStatus() == NEW) {
 +        seedTransaction(txName, fateId, repo, autoCleanUp, goalMessage, 
txStore);
        }
      } finally {
-       txStore.unreserve(0, TimeUnit.MILLISECONDS);
 -      store.unreserve(tid, Duration.ZERO);
++      txStore.unreserve(Duration.ZERO);
      }
  
    }
@@@ -422,7 -325,7 +423,7 @@@
              return false;
            }
          } finally {
-           txStore.unreserve(0, TimeUnit.MILLISECONDS);
 -          store.unreserve(tid, Duration.ZERO);
++          txStore.unreserve(Duration.ZERO);
          }
        } else {
          // reserved, lets retry.
@@@ -452,34 -356,34 +453,34 @@@
            break;
        }
      } finally {
-       txStore.unreserve(0, TimeUnit.MILLISECONDS);
 -      store.unreserve(tid, Duration.ZERO);
++      txStore.unreserve(Duration.ZERO);
      }
    }
  
 -  public String getReturn(long tid) {
 -    store.reserve(tid);
 +  public String getReturn(FateId fateId) {
 +    FateTxStore<T> txStore = store.reserve(fateId);
      try {
 -      if (store.getStatus(tid) != SUCCESSFUL) {
 -        throw new IllegalStateException("Tried to get exception when 
transaction "
 -            + FateTxId.formatTid(tid) + " not in successful state");
 +      if (txStore.getStatus() != SUCCESSFUL) {
 +        throw new IllegalStateException(
 +            "Tried to get exception when transaction " + fateId + " not in 
successful state");
        }
 -      return (String) store.getTransactionInfo(tid, TxInfo.RETURN_VALUE);
 +      return (String) txStore.getTransactionInfo(TxInfo.RETURN_VALUE);
      } finally {
-       txStore.unreserve(0, TimeUnit.MILLISECONDS);
 -      store.unreserve(tid, Duration.ZERO);
++      txStore.unreserve(Duration.ZERO);
      }
    }
  
    // get reportable failures
 -  public Exception getException(long tid) {
 -    store.reserve(tid);
 +  public Exception getException(FateId fateId) {
 +    FateTxStore<T> txStore = store.reserve(fateId);
      try {
 -      if (store.getStatus(tid) != FAILED) {
 -        throw new IllegalStateException("Tried to get exception when 
transaction "
 -            + FateTxId.formatTid(tid) + " not in failed state");
 +      if (txStore.getStatus() != FAILED) {
 +        throw new IllegalStateException(
 +            "Tried to get exception when transaction " + fateId + " not in 
failed state");
        }
 -      return (Exception) store.getTransactionInfo(tid, TxInfo.EXCEPTION);
 +      return (Exception) txStore.getTransactionInfo(TxInfo.EXCEPTION);
      } finally {
-       txStore.unreserve(0, TimeUnit.MILLISECONDS);
 -      store.unreserve(tid, Duration.ZERO);
++      txStore.unreserve(Duration.ZERO);
      }
    }
  
diff --cc core/src/main/java/org/apache/accumulo/core/fate/FateCleaner.java
index 4e1beb1b9b,0000000000..7dd0339f24
mode 100644,000000..100644
--- a/core/src/main/java/org/apache/accumulo/core/fate/FateCleaner.java
+++ b/core/src/main/java/org/apache/accumulo/core/fate/FateCleaner.java
@@@ -1,135 -1,0 +1,134 @@@
 +/*
 + * Licensed to the Apache Software Foundation (ASF) under one
 + * or more contributor license agreements.  See the NOTICE file
 + * distributed with this work for additional information
 + * regarding copyright ownership.  The ASF licenses this file
 + * to you under the Apache License, Version 2.0 (the
 + * "License"); you may not use this file except in compliance
 + * with the License.  You may obtain a copy of the License at
 + *
 + *   https://www.apache.org/licenses/LICENSE-2.0
 + *
 + * Unless required by applicable law or agreed to in writing,
 + * software distributed under the License is distributed on an
 + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
 + * KIND, either express or implied.  See the License for the
 + * specific language governing permissions and limitations
 + * under the License.
 + */
 +package org.apache.accumulo.core.fate;
 +
 +import java.time.Duration;
 +import java.util.EnumSet;
 +import java.util.UUID;
- import java.util.concurrent.TimeUnit;
 +
 +import org.apache.accumulo.core.fate.FateStore.FateTxStore;
 +import org.apache.accumulo.core.fate.ReadOnlyFateStore.TStatus;
 +import org.slf4j.Logger;
 +import org.slf4j.LoggerFactory;
 +
 +import com.google.common.base.Preconditions;
 +
 +/**
 + * Removes Repos, in the Fate store it tracks, that are in a finished or new 
state for more than a
 + * configurable time period. This class stores data in the Fate store under 
the
 + * {@link org.apache.accumulo.core.fate.Fate.TxInfo#TX_AGEOFF} field. The 
data stored under this
 + * field is used to track fate transactions that are candidates for cleanup.
 + *
 + * <p>
 + * No external time source is used. It starts tracking idle time when its 
created.
 + *
 + * <p>
 + * The {@link #ageOff()} method on this class must be periodically called 
inorder to cleanup to
 + * happen.
 + */
 +public class FateCleaner<T> {
 +
 +  public interface TimeSource {
 +    long currentTimeNanos();
 +  }
 +
 +  // Statuses that can be aged off if idle for a prolonged period.
 +  private static final EnumSet<TStatus> AGE_OFF_STATUSES =
 +      EnumSet.of(TStatus.NEW, TStatus.FAILED, TStatus.SUCCESSFUL);
 +
 +  // This is used to determine if age off data was persisted by another 
instance of this object.
 +  private final UUID instanceId = UUID.randomUUID();
 +
 +  private static final Logger log = 
LoggerFactory.getLogger(FateCleaner.class);
 +
 +  private final FateStore<T> store;
 +
 +  private final long ageOffTime;
 +  private final TimeSource timeSource;
 +
 +  private static class AgeOffInfo {
 +    final UUID instanceId;
 +    final long setTime;
 +    final TStatus status;
 +
 +    public AgeOffInfo(String ageOffStr) {
 +      var tokens = ageOffStr.split(":");
 +      Preconditions.checkArgument(tokens.length == 3, "Malformed input %s", 
ageOffStr);
 +      instanceId = UUID.fromString(tokens[0]);
 +      setTime = Long.parseLong(tokens[1]);
 +      status = TStatus.valueOf(tokens[2]);
 +    }
 +
 +    public AgeOffInfo(UUID instanceId, long time, TStatus status) {
 +      this.instanceId = instanceId;
 +      this.setTime = time;
 +      this.status = status;
 +    }
 +
 +    @Override
 +    public String toString() {
 +      return instanceId + ":" + setTime + ":" + status;
 +    }
 +  }
 +
 +  private AgeOffInfo readAgeOffInfo(FateTxStore<T> txStore) {
 +    String ageOffStr = (String) 
txStore.getTransactionInfo(Fate.TxInfo.TX_AGEOFF);
 +    if (ageOffStr == null) {
 +      return null;
 +    }
 +
 +    return new AgeOffInfo(ageOffStr);
 +  }
 +
 +  private boolean shouldAgeOff(TStatus currStatus, AgeOffInfo ageOffInfo) {
 +    return AGE_OFF_STATUSES.contains(currStatus) && currStatus == 
ageOffInfo.status
 +        && ageOffInfo.instanceId.equals(instanceId)
 +        && timeSource.currentTimeNanos() - ageOffInfo.setTime >= ageOffTime;
 +  }
 +
 +  public void ageOff() {
 +    store.list().filter(ids -> AGE_OFF_STATUSES.contains(ids.getStatus()))
 +        .forEach(idStatus -> 
store.tryReserve(idStatus.getFateId()).ifPresent(txStore -> {
 +          try {
 +            AgeOffInfo ageOffInfo = readAgeOffInfo(txStore);
 +            TStatus currStatus = txStore.getStatus();
 +            if (ageOffInfo == null || 
!ageOffInfo.instanceId.equals(instanceId)
 +                || currStatus != ageOffInfo.status) {
 +              // set or reset the age off info because it does not exists or 
it exists but is no
 +              // longer valid
 +              var newAgeOffInfo =
 +                  new AgeOffInfo(instanceId, timeSource.currentTimeNanos(), 
currStatus);
 +              txStore.setTransactionInfo(Fate.TxInfo.TX_AGEOFF, 
newAgeOffInfo.toString());
 +              log.trace("Set age off data {} {}", idStatus.getFateId(), 
newAgeOffInfo);
 +            } else if (shouldAgeOff(currStatus, ageOffInfo)) {
 +              txStore.delete();
 +              log.debug("Aged off FATE tx {}", idStatus.getFateId());
 +            }
 +          } finally {
-             txStore.unreserve(0, TimeUnit.MILLISECONDS);
++            txStore.unreserve(Duration.ZERO);
 +          }
 +        }));
 +  }
 +
 +  public FateCleaner(FateStore<T> store, Duration duration, TimeSource 
timeSource) {
 +    this.store = store;
 +    this.ageOffTime = duration.toNanos();
 +    this.timeSource = timeSource;
 +  }
 +}
diff --cc core/src/main/java/org/apache/accumulo/core/fate/FateStore.java
index 088e502522,0000000000..9aa7dcbbc4
mode 100644,000000..100644
--- a/core/src/main/java/org/apache/accumulo/core/fate/FateStore.java
+++ b/core/src/main/java/org/apache/accumulo/core/fate/FateStore.java
@@@ -1,127 -1,0 +1,127 @@@
 +/*
 + * Licensed to the Apache Software Foundation (ASF) under one
 + * or more contributor license agreements.  See the NOTICE file
 + * distributed with this work for additional information
 + * regarding copyright ownership.  The ASF licenses this file
 + * to you under the Apache License, Version 2.0 (the
 + * "License"); you may not use this file except in compliance
 + * with the License.  You may obtain a copy of the License at
 + *
 + *   https://www.apache.org/licenses/LICENSE-2.0
 + *
 + * Unless required by applicable law or agreed to in writing,
 + * software distributed under the License is distributed on an
 + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
 + * KIND, either express or implied.  See the License for the
 + * specific language governing permissions and limitations
 + * under the License.
 + */
 +package org.apache.accumulo.core.fate;
 +
 +import java.io.Serializable;
++import java.time.Duration;
 +import java.util.Optional;
- import java.util.concurrent.TimeUnit;
 +
 +/**
 + * Transaction Store: a place to save transactions
 + *
 + * A transaction consists of a number of operations. To use, first create a 
fate transaction id, and
 + * then seed the transaction with an initial operation. An executor service 
can then execute the
 + * transaction's operation, possibly pushing more operations onto the 
transaction as each step
 + * successfully completes. If a step fails, the stack can be unwound, undoing 
each operation.
 + */
 +public interface FateStore<T> extends ReadOnlyFateStore<T> {
 +
 +  /**
 +   * Create a new fate transaction id
 +   *
 +   * @return a new FateId
 +   */
 +  FateId create();
 +
 +  /**
 +   * Creates and reserves a transaction using the given key. If something is 
already running for the
 +   * given key, then Optional.empty() will be returned. When this returns a 
non-empty id, it will be
 +   * in the new state.
 +   *
 +   * <p>
 +   * In the case where a process dies in the middle of a call to this. If 
later, another call is
 +   * made with the same key and its in the new state then the FateId for that 
key will be returned.
 +   * </p>
 +   *
 +   * @throws IllegalStateException when there is an unexpected collision. 
This can occur if two key
 +   *         hash to the same FateId or if a random FateId already exists.
 +   */
 +  Optional<FateTxStore<T>> createAndReserve(FateKey fateKey);
 +
 +  /**
 +   * An interface that allows read/write access to the data related to a 
single fate operation.
 +   */
 +  interface FateTxStore<T> extends ReadOnlyFateTxStore<T> {
 +    @Override
 +    Repo<T> top();
 +
 +    /**
 +     * Update the given transaction with the next operation
 +     *
 +     * @param repo the operation
 +     */
 +    void push(Repo<T> repo) throws StackOverflowException;
 +
 +    /**
 +     * Remove the last pushed operation from the given transaction.
 +     */
 +    void pop();
 +
 +    /**
 +     * Update the state of a given transaction
 +     *
 +     * @param status execution status
 +     */
 +    void setStatus(TStatus status);
 +
 +    /**
 +     * Set transaction-specific information.
 +     *
 +     * @param txInfo name of attribute of a transaction to set.
 +     * @param val transaction data to store
 +     */
 +    void setTransactionInfo(Fate.TxInfo txInfo, Serializable val);
 +
 +    /**
 +     * Remove the transaction from the store.
 +     *
 +     */
 +    void delete();
 +
 +    /**
 +     * Return the given transaction to the store.
 +     *
 +     * upon successful return the store now controls the referenced 
transaction id. caller should no
 +     * longer interact with it.
 +     *
-      * @param deferTime time in millis to keep this transaction from being 
returned by
++     * @param deferTime time to keep this transaction from being returned by
 +     *        {@link #runnable(java.util.concurrent.atomic.AtomicBoolean, 
java.util.function.Consumer)}.
 +     *        Must be non-negative.
 +     */
-     void unreserve(long deferTime, TimeUnit timeUnit);
++    void unreserve(Duration deferTime);
 +  }
 +
 +  /**
 +   * Attempt to reserve the fate transaction.
 +   *
 +   * @param fateId The FateId
 +   * @return true if reserved by this call, false if already reserved
 +   */
 +  Optional<FateTxStore<T>> tryReserve(FateId fateId);
 +
 +  /**
 +   * Reserve the fate transaction.
 +   *
 +   * Reserving a fate transaction ensures that nothing else in-process 
interacting via the same
 +   * instance will be operating on that fate transaction.
 +   *
 +   */
 +  FateTxStore<T> reserve(FateId fateId);
 +
 +}
diff --cc 
core/src/main/java/org/apache/accumulo/core/fate/WrappedFateTxStore.java
index 031a3ece02,0000000000..ac5147d4a9
mode 100644,000000..100644
--- a/core/src/main/java/org/apache/accumulo/core/fate/WrappedFateTxStore.java
+++ b/core/src/main/java/org/apache/accumulo/core/fate/WrappedFateTxStore.java
@@@ -1,111 -1,0 +1,111 @@@
 +/*
 + * Licensed to the Apache Software Foundation (ASF) under one
 + * or more contributor license agreements.  See the NOTICE file
 + * distributed with this work for additional information
 + * regarding copyright ownership.  The ASF licenses this file
 + * to you under the Apache License, Version 2.0 (the
 + * "License"); you may not use this file except in compliance
 + * with the License.  You may obtain a copy of the License at
 + *
 + *   https://www.apache.org/licenses/LICENSE-2.0
 + *
 + * Unless required by applicable law or agreed to in writing,
 + * software distributed under the License is distributed on an
 + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
 + * KIND, either express or implied.  See the License for the
 + * specific language governing permissions and limitations
 + * under the License.
 + */
 +package org.apache.accumulo.core.fate;
 +
 +import java.io.Serializable;
++import java.time.Duration;
 +import java.util.EnumSet;
 +import java.util.List;
 +import java.util.Optional;
- import java.util.concurrent.TimeUnit;
 +
 +import org.apache.accumulo.core.fate.ReadOnlyFateStore.TStatus;
 +import org.apache.accumulo.core.util.Pair;
 +
 +public class WrappedFateTxStore<T> implements FateStore.FateTxStore<T> {
 +  protected final FateStore.FateTxStore<T> wrapped;
 +
 +  public WrappedFateTxStore(FateStore.FateTxStore<T> wrapped) {
 +    this.wrapped = wrapped;
 +  }
 +
 +  @Override
-   public void unreserve(long deferTime, TimeUnit timeUnit) {
-     wrapped.unreserve(deferTime, timeUnit);
++  public void unreserve(Duration deferTime) {
++    wrapped.unreserve(deferTime);
 +  }
 +
 +  @Override
 +  public Repo<T> top() {
 +    return wrapped.top();
 +  }
 +
 +  @Override
 +  public void push(Repo<T> repo) throws StackOverflowException {
 +    wrapped.push(repo);
 +  }
 +
 +  @Override
 +  public void pop() {
 +    wrapped.pop();
 +  }
 +
 +  @Override
 +  public FateStore.TStatus getStatus() {
 +    return wrapped.getStatus();
 +  }
 +
 +  @Override
 +  public Optional<FateKey> getKey() {
 +    return wrapped.getKey();
 +  }
 +
 +  @Override
 +  public Pair<TStatus,Optional<FateKey>> getStatusAndKey() {
 +    return wrapped.getStatusAndKey();
 +  }
 +
 +  @Override
 +  public void setStatus(FateStore.TStatus status) {
 +    wrapped.setStatus(status);
 +  }
 +
 +  @Override
 +  public FateStore.TStatus waitForStatusChange(EnumSet<FateStore.TStatus> 
expected) {
 +    return wrapped.waitForStatusChange(expected);
 +  }
 +
 +  @Override
 +  public void setTransactionInfo(Fate.TxInfo txInfo, Serializable val) {
 +    wrapped.setTransactionInfo(txInfo, val);
 +  }
 +
 +  @Override
 +  public Serializable getTransactionInfo(Fate.TxInfo txInfo) {
 +    return wrapped.getTransactionInfo(txInfo);
 +  }
 +
 +  @Override
 +  public void delete() {
 +    wrapped.delete();
 +  }
 +
 +  @Override
 +  public long timeCreated() {
 +    return wrapped.timeCreated();
 +  }
 +
 +  @Override
 +  public FateId getID() {
 +    return wrapped.getID();
 +  }
 +
 +  @Override
 +  public List<ReadOnlyRepo<T>> getStack() {
 +    return wrapped.getStack();
 +  }
 +}
diff --cc core/src/test/java/org/apache/accumulo/core/fate/FateCleanerTest.java
index 1a5a4fb708,0000000000..eb0d1dc748
mode 100644,000000..100644
--- a/core/src/test/java/org/apache/accumulo/core/fate/FateCleanerTest.java
+++ b/core/src/test/java/org/apache/accumulo/core/fate/FateCleanerTest.java
@@@ -1,274 -1,0 +1,273 @@@
 +/*
 + * Licensed to the Apache Software Foundation (ASF) under one
 + * or more contributor license agreements.  See the NOTICE file
 + * distributed with this work for additional information
 + * regarding copyright ownership.  The ASF licenses this file
 + * to you under the Apache License, Version 2.0 (the
 + * "License"); you may not use this file except in compliance
 + * with the License.  You may obtain a copy of the License at
 + *
 + *   https://www.apache.org/licenses/LICENSE-2.0
 + *
 + * Unless required by applicable law or agreed to in writing,
 + * software distributed under the License is distributed on an
 + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
 + * KIND, either express or implied.  See the License for the
 + * specific language governing permissions and limitations
 + * under the License.
 + */
 +package org.apache.accumulo.core.fate;
 +
 +import static java.util.stream.Collectors.toSet;
 +import static org.junit.jupiter.api.Assertions.assertEquals;
 +
 +import java.time.Duration;
 +import java.util.Set;
- import java.util.concurrent.TimeUnit;
 +
 +import org.apache.accumulo.core.fate.FateCleaner.TimeSource;
 +import org.apache.accumulo.core.fate.ReadOnlyFateStore.FateIdStatus;
 +import org.apache.accumulo.core.fate.ReadOnlyFateStore.TStatus;
 +import org.apache.zookeeper.KeeperException;
 +import org.junit.jupiter.api.Test;
 +
 +public class FateCleanerTest {
 +
 +  private static class TestTimeSource implements TimeSource {
 +    long time = 0;
 +
 +    @Override
 +    public long currentTimeNanos() {
 +      return time;
 +    }
 +
 +  }
 +
 +  @Test
 +  public void testBasic() throws InterruptedException, KeeperException {
 +
 +    TestTimeSource tts = new TestTimeSource();
 +    TestStore testStore = new TestStore();
 +    FateCleaner<String> cleaner = new FateCleaner<>(testStore, 
Duration.ofNanos(10), tts);
 +
 +    cleaner.ageOff();
 +
 +    FateId fateId1 = testStore.create();
 +    var txStore1 = testStore.reserve(fateId1);
 +    txStore1.setStatus(TStatus.IN_PROGRESS);
-     txStore1.unreserve(0, TimeUnit.MILLISECONDS);
++    txStore1.unreserve(Duration.ZERO);
 +
 +    cleaner.ageOff();
 +
 +    FateId fateId2 = testStore.create();
 +    var txStore2 = testStore.reserve(fateId2);
 +    txStore2.setStatus(TStatus.IN_PROGRESS);
 +    txStore2.setStatus(TStatus.FAILED);
-     txStore2.unreserve(0, TimeUnit.MILLISECONDS);
++    txStore2.unreserve(Duration.ZERO);
 +
 +    cleaner.ageOff();
 +
 +    tts.time = 6;
 +
 +    FateId fateId3 = testStore.create();
 +    var txStore3 = testStore.reserve(fateId3);
 +    txStore3.setStatus(TStatus.IN_PROGRESS);
 +    txStore3.setStatus(TStatus.SUCCESSFUL);
-     txStore3.unreserve(0, TimeUnit.MILLISECONDS);
++    txStore3.unreserve(Duration.ZERO);
 +
 +    cleaner.ageOff();
 +
 +    FateId fateId4 = testStore.create();
 +
 +    cleaner.ageOff();
 +
 +    assertEquals(Set.of(fateId1, fateId2, fateId3, fateId4),
 +        testStore.list().map(FateIdStatus::getFateId).collect(toSet()));
 +
 +    tts.time = 15;
 +
 +    cleaner.ageOff();
 +
 +    assertEquals(Set.of(fateId1, fateId3, fateId4),
 +        testStore.list().map(FateIdStatus::getFateId).collect(toSet()));
 +
 +    tts.time = 30;
 +
 +    cleaner.ageOff();
 +
 +    assertEquals(Set.of(fateId1), 
testStore.list().map(FateIdStatus::getFateId).collect(toSet()));
 +  }
 +
 +  @Test
 +  public void testNonEmpty() {
 +    // test age off when source store starts off non empty
 +
 +    TestTimeSource tts = new TestTimeSource();
 +    TestStore testStore = new TestStore();
 +    FateId fateId1 = testStore.create();
 +    var txStore1 = testStore.reserve(fateId1);
 +    txStore1.setStatus(TStatus.IN_PROGRESS);
-     txStore1.unreserve(0, TimeUnit.MILLISECONDS);
++    txStore1.unreserve(Duration.ZERO);
 +
 +    FateId fateId2 = testStore.create();
 +    var txStore2 = testStore.reserve(fateId2);
 +    txStore2.setStatus(TStatus.IN_PROGRESS);
 +    txStore2.setStatus(TStatus.FAILED);
-     txStore2.unreserve(0, TimeUnit.MILLISECONDS);
++    txStore2.unreserve(Duration.ZERO);
 +
 +    FateId fateId3 = testStore.create();
 +    var txStore3 = testStore.reserve(fateId3);
 +    txStore3.setStatus(TStatus.IN_PROGRESS);
 +    txStore3.setStatus(TStatus.SUCCESSFUL);
-     txStore3.unreserve(0, TimeUnit.MILLISECONDS);
++    txStore3.unreserve(Duration.ZERO);
 +
 +    FateId fateId4 = testStore.create();
 +
 +    FateCleaner<String> cleaner = new FateCleaner<>(testStore, 
Duration.ofNanos(10), tts);
 +    cleaner.ageOff();
 +
 +    assertEquals(Set.of(fateId1, fateId2, fateId3, fateId4),
 +        testStore.list().map(FateIdStatus::getFateId).collect(toSet()));
 +
 +    cleaner.ageOff();
 +
 +    assertEquals(Set.of(fateId1, fateId2, fateId3, fateId4),
 +        testStore.list().map(FateIdStatus::getFateId).collect(toSet()));
 +
 +    tts.time = 15;
 +
 +    cleaner.ageOff();
 +
 +    assertEquals(Set.of(fateId1), 
testStore.list().map(FateIdStatus::getFateId).collect(toSet()));
 +
 +    txStore1 = testStore.reserve(fateId1);
 +    txStore1.setStatus(TStatus.FAILED_IN_PROGRESS);
-     txStore1.unreserve(0, TimeUnit.MILLISECONDS);
++    txStore1.unreserve(Duration.ZERO);
 +
 +    tts.time = 30;
 +
 +    cleaner.ageOff();
 +
 +    assertEquals(Set.of(fateId1), 
testStore.list().map(FateIdStatus::getFateId).collect(toSet()));
 +
 +    txStore1 = testStore.reserve(fateId1);
 +    txStore1.setStatus(TStatus.FAILED);
-     txStore1.unreserve(0, TimeUnit.MILLISECONDS);
++    txStore1.unreserve(Duration.ZERO);
 +
 +    cleaner.ageOff();
 +
 +    assertEquals(Set.of(fateId1), 
testStore.list().map(FateIdStatus::getFateId).collect(toSet()));
 +
 +    tts.time = 42;
 +
 +    cleaner.ageOff();
 +
 +    assertEquals(0, testStore.list().count());
 +  }
 +
 +  @Test
 +  public void testStatusChange() {
 +    // test ensure that if something is eligible for ageoff and its status 
changes it will no longer
 +    // be eligible
 +
 +    TestTimeSource tts = new TestTimeSource();
 +    TestStore testStore = new TestStore();
 +    FateCleaner<String> cleaner = new FateCleaner<>(testStore, 
Duration.ofHours(10), tts);
 +
 +    cleaner.ageOff();
 +
 +    // create a something in the NEW state
 +    FateId fateId1 = testStore.create();
 +
 +    // create another that is complete
 +    FateId fateId2 = testStore.create();
 +    var txStore2 = testStore.reserve(fateId2);
 +    txStore2.setStatus(TStatus.IN_PROGRESS);
 +    txStore2.setStatus(TStatus.FAILED);
-     txStore2.unreserve(0, TimeUnit.MILLISECONDS);
++    txStore2.unreserve(Duration.ZERO);
 +
 +    // create another in the NEW state
 +    FateId fateId3 = testStore.create();
 +
 +    // start tracking what can age off, both should be candidates
 +    cleaner.ageOff();
 +    assertEquals(Set.of(fateId1, fateId2, fateId3),
 +        testStore.list().map(FateIdStatus::getFateId).collect(toSet()));
 +
 +    // advance time by 9 hours, nothing should age off
 +    tts.time += Duration.ofHours(9).toNanos();
 +    cleaner.ageOff();
 +
 +    assertEquals(Set.of(fateId1, fateId2, fateId3),
 +        testStore.list().map(FateIdStatus::getFateId).collect(toSet()));
 +
 +    var txStore1 = testStore.reserve(fateId1);
 +    txStore1.setStatus(TStatus.IN_PROGRESS);
 +    txStore1.setStatus(TStatus.FAILED);
-     txStore1.unreserve(0, TimeUnit.MILLISECONDS);
++    txStore1.unreserve(Duration.ZERO);
 +
 +    // advance time by 2 hours, both should be able to age off.. however the 
status changed on txid1
 +    // so it should not age off
 +    tts.time += Duration.ofHours(2).toNanos();
 +    cleaner.ageOff();
 +
 +    assertEquals(Set.of(fateId1), 
testStore.list().map(FateIdStatus::getFateId).collect(toSet()));
 +
 +    // advance time by 9 hours, nothing should age off
 +    tts.time += Duration.ofHours(9).toNanos();
 +    cleaner.ageOff();
 +    assertEquals(Set.of(fateId1), 
testStore.list().map(FateIdStatus::getFateId).collect(toSet()));
 +
 +    // advance time by 2 hours, should age off everything
 +    tts.time += Duration.ofHours(2).toNanos();
 +    cleaner.ageOff();
 +    assertEquals(Set.of(), 
testStore.list().map(FateIdStatus::getFateId).collect(toSet()));
 +  }
 +
 +  @Test
 +  public void testNewCleaner() {
 +    // this test ensures that a new cleaner instance ignores data from 
another cleaner instance
 +
 +    TestTimeSource tts = new TestTimeSource();
 +    TestStore testStore = new TestStore();
 +    FateCleaner<String> cleaner1 = new FateCleaner<>(testStore, 
Duration.ofHours(10), tts);
 +
 +    FateId fateId1 = testStore.create();
 +
 +    cleaner1.ageOff();
 +    assertEquals(Set.of(fateId1), 
testStore.list().map(FateIdStatus::getFateId).collect(toSet()));
 +
 +    tts.time += Duration.ofHours(5).toNanos();
 +    FateId fateId2 = testStore.create();
 +
 +    cleaner1.ageOff();
 +    assertEquals(Set.of(fateId1, fateId2),
 +        testStore.list().map(FateIdStatus::getFateId).collect(toSet()));
 +
 +    tts.time += Duration.ofHours(6).toNanos();
 +    FateId fateId3 = testStore.create();
 +
 +    cleaner1.ageOff();
 +    assertEquals(Set.of(fateId2, fateId3),
 +        testStore.list().map(FateIdStatus::getFateId).collect(toSet()));
 +
 +    // create a new cleaner, it should ignore any data stored by previous 
cleaner
 +    FateCleaner<String> cleaner2 = new FateCleaner<>(testStore, 
Duration.ofHours(10), tts);
 +
 +    tts.time += Duration.ofHours(5).toNanos();
 +    // since this is a new cleaner instance, it should reset the clock
 +    cleaner2.ageOff();
 +    assertEquals(Set.of(fateId2, fateId3),
 +        testStore.list().map(FateIdStatus::getFateId).collect(toSet()));
 +
 +    // since the clock was reset, advancing time should not age anything off
 +    tts.time += Duration.ofHours(9).toNanos();
 +    cleaner2.ageOff();
 +    assertEquals(Set.of(fateId2, fateId3),
 +        testStore.list().map(FateIdStatus::getFateId).collect(toSet()));
 +
 +    // this should advance time enough to age everything off
 +    tts.time += Duration.ofHours(2).toNanos();
 +    cleaner2.ageOff();
 +    assertEquals(Set.of(), 
testStore.list().map(FateIdStatus::getFateId).collect(toSet()));
 +  }
 +}
diff --cc core/src/test/java/org/apache/accumulo/core/fate/TestStore.java
index 50046a4b9b,ba06a51ff9..244edd13d9
--- a/core/src/test/java/org/apache/accumulo/core/fate/TestStore.java
+++ b/core/src/test/java/org/apache/accumulo/core/fate/TestStore.java
@@@ -18,25 -18,13 +18,25 @@@
   */
  package org.apache.accumulo.core.fate;
  
 +import java.io.Serializable;
+ import java.time.Duration;
  import java.util.ArrayList;
 +import java.util.EnumSet;
  import java.util.HashMap;
  import java.util.HashSet;
  import java.util.List;
  import java.util.Map;
 +import java.util.Optional;
  import java.util.Set;
- import java.util.concurrent.TimeUnit;
 +import java.util.concurrent.atomic.AtomicBoolean;
 +import java.util.function.Consumer;
 +import java.util.stream.Stream;
 +
 +import org.apache.accumulo.core.fate.FateStore.FateTxStore;
 +import org.apache.accumulo.core.fate.ReadOnlyFateStore.FateIdStatus;
 +import org.apache.accumulo.core.fate.ReadOnlyFateStore.ReadOnlyFateTxStore;
 +import org.apache.accumulo.core.fate.ReadOnlyFateStore.TStatus;
 +import org.apache.accumulo.core.util.Pair;
  
  /**
   * Transient in memory store for transactions.
@@@ -82,118 -61,35 +82,118 @@@ public class TestStore implements FateS
      }
    }
  
 -  @Override
 -  public void unreserve(long tid, Duration deferTime) {
 -    if (!reserved.remove(tid)) {
 -      throw new IllegalStateException();
 +  private class TestFateTxStore implements FateTxStore<String> {
 +
 +    private final FateId fateId;
 +
 +    TestFateTxStore(FateId fateId) {
 +      this.fateId = fateId;
      }
 -  }
  
 -  @Override
 -  public org.apache.accumulo.core.fate.TStore.TStatus getStatus(long tid) {
 -    if (!reserved.contains(tid)) {
 -      throw new IllegalStateException();
 +    @Override
 +    public Repo<String> top() {
 +      throw new UnsupportedOperationException();
      }
  
 -    TStatus status = statuses.get(tid);
 -    if (status == null) {
 -      return TStatus.UNKNOWN;
 +    @Override
 +    public List<ReadOnlyRepo<String>> getStack() {
 +      throw new UnsupportedOperationException();
      }
 -    return status;
 -  }
  
 -  @Override
 -  public void setStatus(long tid, 
org.apache.accumulo.core.fate.TStore.TStatus status) {
 -    if (!reserved.contains(tid)) {
 -      throw new IllegalStateException();
 +    @Override
 +    public TStatus getStatus() {
 +      return getStatusAndKey().getFirst();
 +    }
 +
 +    @Override
 +    public Optional<FateKey> getKey() {
 +      return getStatusAndKey().getSecond();
 +    }
 +
 +    @Override
 +    public Pair<TStatus,Optional<FateKey>> getStatusAndKey() {
 +      if (!reserved.contains(fateId)) {
 +        throw new IllegalStateException();
 +      }
 +
 +      Pair<TStatus,Optional<FateKey>> status = statuses.get(fateId);
 +      if (status == null) {
 +        return new Pair<>(TStatus.UNKNOWN, Optional.empty());
 +      }
 +
 +      return status;
 +    }
 +
 +    @Override
 +    public TStatus waitForStatusChange(EnumSet<TStatus> expected) {
 +      throw new UnsupportedOperationException();
 +    }
 +
 +    @Override
 +    public Serializable getTransactionInfo(Fate.TxInfo txInfo) {
 +      var submap = txInfos.get(fateId);
 +      if (submap == null) {
 +        return null;
 +      }
 +
 +      return submap.get(txInfo);
 +    }
 +
 +    @Override
 +    public long timeCreated() {
 +      throw new UnsupportedOperationException();
 +    }
 +
 +    @Override
 +    public FateId getID() {
 +      return fateId;
 +    }
 +
 +    @Override
 +    public void push(Repo<String> repo) throws StackOverflowException {
 +      throw new UnsupportedOperationException();
 +    }
 +
 +    @Override
 +    public void pop() {
 +      throw new UnsupportedOperationException();
 +    }
 +
 +    @Override
 +    public void setStatus(TStatus status) {
 +      if (!reserved.contains(fateId)) {
 +        throw new IllegalStateException();
 +      }
 +      Pair<TStatus,Optional<FateKey>> currentStatus = statuses.get(fateId);
 +      if (currentStatus == null) {
 +        throw new IllegalStateException();
 +      }
 +      statuses.put(fateId, new Pair<>(status, currentStatus.getSecond()));
 +    }
 +
 +    @Override
 +    public void setTransactionInfo(Fate.TxInfo txInfo, Serializable val) {
 +      if (!reserved.contains(fateId)) {
 +        throw new IllegalStateException();
 +      }
 +
 +      txInfos.computeIfAbsent(fateId, t -> new HashMap<>()).put(txInfo, val);
 +    }
 +
 +    @Override
 +    public void delete() {
 +      if (!reserved.contains(fateId)) {
 +        throw new IllegalStateException();
 +      }
 +      statuses.remove(fateId);
      }
 -    if (!statuses.containsKey(tid)) {
 -      throw new IllegalStateException();
 +
 +    @Override
-     public void unreserve(long deferTime, TimeUnit timeUnit) {
++    public void unreserve(Duration deferTime) {
 +      if (!reserved.remove(fateId)) {
 +        throw new IllegalStateException();
 +      }
      }
 -    statuses.put(tid, status);
    }
  
    @Override
diff --cc server/manager/src/main/java/org/apache/accumulo/manager/Manager.java
index 45ab2b5e2b,cbbba0fb7f..5f7ce20fad
--- a/server/manager/src/main/java/org/apache/accumulo/manager/Manager.java
+++ b/server/manager/src/main/java/org/apache/accumulo/manager/Manager.java
@@@ -117,10 -112,9 +117,11 @@@ import org.apache.accumulo.core.util.Ha
  import org.apache.accumulo.core.util.Retry;
  import org.apache.accumulo.core.util.threads.ThreadPools;
  import org.apache.accumulo.core.util.threads.Threads;
+ import org.apache.accumulo.core.util.time.NanoTime;
 +import 
org.apache.accumulo.manager.compaction.coordinator.CompactionCoordinator;
  import org.apache.accumulo.manager.metrics.ManagerMetrics;
  import org.apache.accumulo.manager.recovery.RecoveryManager;
 +import org.apache.accumulo.manager.split.Splitter;
  import org.apache.accumulo.manager.state.TableCounts;
  import org.apache.accumulo.manager.tableOps.TraceRepo;
  import org.apache.accumulo.manager.upgrade.PreUpgradeValidation;
@@@ -160,9 -157,8 +161,10 @@@ import org.apache.zookeeper.Watcher
  import org.slf4j.Logger;
  import org.slf4j.LoggerFactory;
  
 +import com.google.common.base.Preconditions;
+ import com.google.common.collect.Comparators;
  import com.google.common.collect.ImmutableSortedMap;
 +import com.google.common.collect.Maps;
  import com.google.common.util.concurrent.RateLimiter;
  import com.google.common.util.concurrent.Uninterruptibles;
  
diff --cc 
test/src/main/java/org/apache/accumulo/test/compaction/ExternalCompaction_1_IT.java
index 16dd39b09f,a1a85e7b1c..e2e7956673
--- 
a/test/src/main/java/org/apache/accumulo/test/compaction/ExternalCompaction_1_IT.java
+++ 
b/test/src/main/java/org/apache/accumulo/test/compaction/ExternalCompaction_1_IT.java
@@@ -35,10 -39,10 +35,11 @@@ import static org.apache.accumulo.test.
  import static 
org.apache.accumulo.test.util.FileMetadataUtil.splitFilesIntoRanges;
  import static org.junit.jupiter.api.Assertions.assertEquals;
  import static org.junit.jupiter.api.Assertions.assertFalse;
 +import static org.junit.jupiter.api.Assertions.assertThrows;
  import static org.junit.jupiter.api.Assertions.assertTrue;
 -import static org.junit.jupiter.api.Assertions.fail;
  
  import java.io.IOException;
++import java.time.Duration;
  import java.util.ArrayList;
  import java.util.Arrays;
  import java.util.EnumSet;
@@@ -49,15 -52,15 +50,14 @@@ import java.util.Map.Entry
  import java.util.Set;
  import java.util.SortedSet;
  import java.util.TreeSet;
 -import java.util.concurrent.TimeUnit;
 +import java.util.UUID;
- import java.util.concurrent.TimeUnit;
  import java.util.stream.Collectors;
 -import java.util.stream.Stream;
  
 -import org.apache.accumulo.compactor.Compactor;
  import org.apache.accumulo.compactor.ExtCEnv.CompactorIterEnv;
 -import org.apache.accumulo.coordinator.CompactionCoordinator;
 +import org.apache.accumulo.core.Constants;
  import org.apache.accumulo.core.client.Accumulo;
  import org.apache.accumulo.core.client.AccumuloClient;
 +import org.apache.accumulo.core.client.AccumuloException;
  import org.apache.accumulo.core.client.BatchWriter;
  import org.apache.accumulo.core.client.IteratorSetting;
  import org.apache.accumulo.core.client.Scanner;
@@@ -225,191 -203,6 +225,191 @@@ public class ExternalCompaction_1_IT ex
      }
    }
  
 +  /**
 +   * This test verifies the dead compaction detector does not remove 
compactions that are committing
 +   * in fate for the Root table.
 +   */
 +  @Test
 +  public void testCompactionCommitAndDeadDetectionRoot() throws Exception {
 +    var ctx = getCluster().getServerContext();
 +    FateStore<Manager> zkStore =
 +        new ZooStore<>(ctx.getZooKeeperRoot() + Constants.ZFATE, 
ctx.getZooReaderWriter());
 +
 +    try (AccumuloClient c = 
Accumulo.newClient().from(getClientProps()).build()) {
 +      var tableId = ctx.getTableId(AccumuloTable.ROOT.tableName());
 +      var allCids = new HashMap<TableId,List<ExternalCompactionId>>();
 +      var fateId = createCompactionCommitAndDeadMetadata(c, zkStore, 
AccumuloTable.ROOT.tableName(),
 +          allCids);
 +      verifyCompactionCommitAndDead(zkStore, tableId, fateId, 
allCids.get(tableId));
 +    }
 +  }
 +
 +  /**
 +   * This test verifies the dead compaction detector does not remove 
compactions that are committing
 +   * in fate for the Metadata table.
 +   */
 +  @Test
 +  public void testCompactionCommitAndDeadDetectionMeta() throws Exception {
 +    var ctx = getCluster().getServerContext();
 +    FateStore<Manager> zkStore =
 +        new ZooStore<>(ctx.getZooKeeperRoot() + Constants.ZFATE, 
ctx.getZooReaderWriter());
 +
 +    try (AccumuloClient c = 
Accumulo.newClient().from(getClientProps()).build()) {
 +      // Metadata table by default already has 2 tablets
 +      var tableId = ctx.getTableId(AccumuloTable.METADATA.tableName());
 +      var allCids = new HashMap<TableId,List<ExternalCompactionId>>();
 +      var fateId = createCompactionCommitAndDeadMetadata(c, zkStore,
 +          AccumuloTable.METADATA.tableName(), allCids);
 +      verifyCompactionCommitAndDead(zkStore, tableId, fateId, 
allCids.get(tableId));
 +    }
 +  }
 +
 +  /**
 +   * This test verifies the dead compaction detector does not remove 
compactions that are committing
 +   * in fate for a User table.
 +   */
 +  @Test
 +  public void testCompactionCommitAndDeadDetectionUser() throws Exception {
 +    var ctx = getCluster().getServerContext();
 +    final String tableName = getUniqueNames(1)[0];
 +
 +    try (AccumuloClient c = 
Accumulo.newClient().from(getClientProps()).build()) {
 +      AccumuloStore<Manager> accumuloStore = new AccumuloStore<>(ctx);
 +      SortedSet<Text> splits = new TreeSet<>();
 +      splits.add(new Text(row(MAX_DATA / 2)));
 +      c.tableOperations().create(tableName, new 
NewTableConfiguration().withSplits(splits));
 +      writeData(c, tableName);
 +
 +      var tableId = ctx.getTableId(tableName);
 +      var allCids = new HashMap<TableId,List<ExternalCompactionId>>();
 +      var fateId = createCompactionCommitAndDeadMetadata(c, accumuloStore, 
tableName, allCids);
 +      verifyCompactionCommitAndDead(accumuloStore, tableId, fateId, 
allCids.get(tableId));
 +    }
 +  }
 +
 +  /**
 +   * This test verifies the dead compaction detector does not remove 
compactions that are committing
 +   * in fate when all data levels have compactions
 +   */
 +  @Test
 +  public void testCompactionCommitAndDeadDetectionAll() throws Exception {
 +    var ctx = getCluster().getServerContext();
 +    final String userTable = getUniqueNames(1)[0];
 +
 +    try (AccumuloClient c = 
Accumulo.newClient().from(getClientProps()).build()) {
 +      AccumuloStore<Manager> accumuloStore = new AccumuloStore<>(ctx);
 +      FateStore<Manager> zkStore =
 +          new ZooStore<>(ctx.getZooKeeperRoot() + Constants.ZFATE, 
ctx.getZooReaderWriter());
 +
 +      SortedSet<Text> splits = new TreeSet<>();
 +      splits.add(new Text(row(MAX_DATA / 2)));
 +      c.tableOperations().create(userTable, new 
NewTableConfiguration().withSplits(splits));
 +      writeData(c, userTable);
 +
 +      Map<TableId,FateId> fateIds = new HashMap<>();
 +      Map<TableId,List<ExternalCompactionId>> allCids = new HashMap<>();
 +
 +      // create compaction metadata for each data level to test
 +      for (String tableName : List.of(AccumuloTable.ROOT.tableName(),
 +          AccumuloTable.METADATA.tableName(), userTable)) {
 +        var tableId = ctx.getTableId(tableName);
 +        var fateStore = FateInstanceType.fromTableId(tableId) == 
FateInstanceType.USER
 +            ? accumuloStore : zkStore;
 +        fateIds.put(tableId,
 +            createCompactionCommitAndDeadMetadata(c, fateStore, tableName, 
allCids));
 +      }
 +
 +      // verify the dead compaction was removed for each level
 +      // but not the compaction associated with a fate id
 +      for (Entry<TableId,FateId> entry : fateIds.entrySet()) {
 +        var tableId = entry.getKey();
 +        var fateStore = FateInstanceType.fromTableId(tableId) == 
FateInstanceType.USER
 +            ? accumuloStore : zkStore;
 +        verifyCompactionCommitAndDead(fateStore, tableId, entry.getValue(), 
allCids.get(tableId));
 +      }
 +    }
 +  }
 +
 +  private FateId createCompactionCommitAndDeadMetadata(AccumuloClient c,
 +      FateStore<Manager> fateStore, String tableName,
 +      Map<TableId,List<ExternalCompactionId>> allCids) throws Exception {
 +    var ctx = getCluster().getServerContext();
 +    c.tableOperations().flush(tableName, null, null, true);
 +    var tableId = ctx.getTableId(tableName);
 +
 +    allCids.put(tableId, 
List.of(ExternalCompactionId.generate(UUID.randomUUID()),
 +        ExternalCompactionId.generate(UUID.randomUUID())));
 +
 +    // Create a fate transaction for one of the compaction ids that is in the 
new state, it
 +    // should never run. Its purpose is to prevent the dead compaction 
detector
 +    // from deleting the id.
 +    FateStore.FateTxStore<Manager> fateTx = fateStore
 +        
.createAndReserve(FateKey.forCompactionCommit(allCids.get(tableId).get(0))).orElseThrow();
 +    var fateId = fateTx.getID();
-     fateTx.unreserve(0, TimeUnit.MILLISECONDS);
++    fateTx.unreserve(Duration.ZERO);
 +
 +    // Read the tablet metadata
 +    var tabletsMeta = 
ctx.getAmple().readTablets().forTable(tableId).build().stream()
 +        .collect(Collectors.toList());
 +    // Root is always 1 tablet
 +    if (!tableId.equals(AccumuloTable.ROOT.tableId())) {
 +      assertEquals(2, tabletsMeta.size());
 +    }
 +
 +    // Insert fake compaction entries in the metadata table. No compactor 
will report ownership
 +    // of these, so they should look like dead compactions and be removed. 
However, one of
 +    // them hasan associated fate tx that should prevent its removal.
 +    try (var mutator = ctx.getAmple().mutateTablets()) {
 +      for (int i = 0; i < tabletsMeta.size(); i++) {
 +        var tabletMeta = tabletsMeta.get(0);
 +        var tabletDir =
 +            
tabletMeta.getFiles().stream().findFirst().orElseThrow().getPath().getParent();
 +        var tmpFile = new Path(tabletDir, "C1234.rf_tmp");
 +
 +        CompactionMetadata cm = new CompactionMetadata(tabletMeta.getFiles(),
 +            ReferencedTabletFile.of(tmpFile), "localhost:16789", 
CompactionKind.SYSTEM, (short) 10,
 +            CompactorGroupId.of(GROUP1), false, null);
 +
 +        mutator.mutateTablet(tabletMeta.getExtent())
 +            .putExternalCompaction(allCids.get(tableId).get(i), cm).mutate();
 +      }
 +    }
 +
 +    return fateId;
 +  }
 +
 +  private void verifyCompactionCommitAndDead(FateStore<Manager> fateStore, 
TableId tableId,
 +      FateId fateId, List<ExternalCompactionId> cids) {
 +    var ctx = getCluster().getServerContext();
 +
 +    // Wait until the compaction id w/o a fate transaction is removed, should 
still see the one
 +    // with a fate transaction
 +    Wait.waitFor(() -> {
 +      Set<ExternalCompactionId> currentIds = 
ctx.getAmple().readTablets().forTable(tableId).build()
 +          .stream().map(TabletMetadata::getExternalCompactions)
 +          .flatMap(ecm -> ecm.keySet().stream()).collect(Collectors.toSet());
 +      System.out.println("currentIds1:" + currentIds);
 +      assertTrue(currentIds.size() == 1 || currentIds.size() == 2);
 +      return currentIds.equals(Set.of(cids.get(0)));
 +    });
 +
 +    // Delete the fate transaction, should allow the dead compaction detector 
to clean up the
 +    // remaining external compaction id
 +    var fateTx = fateStore.reserve(fateId);
 +    fateTx.delete();
-     fateTx.unreserve(0, TimeUnit.MILLISECONDS);
++    fateTx.unreserve(Duration.ZERO);
 +
 +    // wait for the remaining compaction id to be removed
 +    Wait.waitFor(() -> {
 +      Set<ExternalCompactionId> currentIds = 
ctx.getAmple().readTablets().forTable(tableId).build()
 +          .stream().map(TabletMetadata::getExternalCompactions)
 +          .flatMap(ecm -> ecm.keySet().stream()).collect(Collectors.toSet());
 +      System.out.println("currentIds2:" + currentIds);
 +      assertTrue(currentIds.size() <= 1);
 +      return currentIds.isEmpty();
 +    });
 +  }
 +
    @Test
    public void testCompactionAndCompactorDies() throws Exception {
      String table1 = this.getUniqueNames(1)[0];
diff --cc 
test/src/main/java/org/apache/accumulo/test/fate/accumulo/FateStoreIT.java
index 63e8d64703,0000000000..9b43c9c2b9
mode 100644,000000..100644
--- a/test/src/main/java/org/apache/accumulo/test/fate/accumulo/FateStoreIT.java
+++ b/test/src/main/java/org/apache/accumulo/test/fate/accumulo/FateStoreIT.java
@@@ -1,511 -1,0 +1,510 @@@
 +/*
 + * Licensed to the Apache Software Foundation (ASF) under one
 + * or more contributor license agreements.  See the NOTICE file
 + * distributed with this work for additional information
 + * regarding copyright ownership.  The ASF licenses this file
 + * to you under the Apache License, Version 2.0 (the
 + * "License"); you may not use this file except in compliance
 + * with the License.  You may obtain a copy of the License at
 + *
 + *   https://www.apache.org/licenses/LICENSE-2.0
 + *
 + * Unless required by applicable law or agreed to in writing,
 + * software distributed under the License is distributed on an
 + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
 + * KIND, either express or implied.  See the License for the
 + * specific language governing permissions and limitations
 + * under the License.
 + */
 +package org.apache.accumulo.test.fate.accumulo;
 +
 +import static org.junit.jupiter.api.Assertions.assertEquals;
 +import static org.junit.jupiter.api.Assertions.assertFalse;
 +import static org.junit.jupiter.api.Assertions.assertInstanceOf;
 +import static org.junit.jupiter.api.Assertions.assertNotEquals;
 +import static org.junit.jupiter.api.Assertions.assertNotNull;
 +import static org.junit.jupiter.api.Assertions.assertNull;
 +import static org.junit.jupiter.api.Assertions.assertThrows;
 +import static org.junit.jupiter.api.Assertions.assertTrue;
 +
 +import java.lang.reflect.Method;
 +import java.time.Duration;
 +import java.util.HashMap;
 +import java.util.HashSet;
 +import java.util.List;
 +import java.util.Map;
 +import java.util.Optional;
 +import java.util.Set;
 +import java.util.UUID;
 +import java.util.concurrent.Executors;
 +import java.util.concurrent.Future;
- import java.util.concurrent.TimeUnit;
 +import java.util.concurrent.atomic.AtomicBoolean;
 +import java.util.stream.Collectors;
 +
 +import org.apache.accumulo.core.data.TableId;
 +import org.apache.accumulo.core.dataImpl.KeyExtent;
 +import org.apache.accumulo.core.fate.AbstractFateStore;
 +import org.apache.accumulo.core.fate.Fate.TxInfo;
 +import org.apache.accumulo.core.fate.FateId;
 +import org.apache.accumulo.core.fate.FateKey;
 +import org.apache.accumulo.core.fate.FateStore;
 +import org.apache.accumulo.core.fate.FateStore.FateTxStore;
 +import org.apache.accumulo.core.fate.ReadOnlyFateStore.FateIdStatus;
 +import org.apache.accumulo.core.fate.ReadOnlyFateStore.TStatus;
 +import org.apache.accumulo.core.fate.ReadOnlyRepo;
 +import org.apache.accumulo.core.fate.StackOverflowException;
 +import org.apache.accumulo.core.metadata.schema.ExternalCompactionId;
 +import org.apache.accumulo.harness.SharedMiniClusterBase;
 +import org.apache.accumulo.server.ServerContext;
 +import org.apache.accumulo.test.fate.FateIT.TestRepo;
 +import org.apache.accumulo.test.fate.FateTestRunner;
 +import org.apache.accumulo.test.fate.FateTestRunner.TestEnv;
 +import org.apache.accumulo.test.util.Wait;
 +import org.apache.hadoop.io.Text;
 +import org.junit.jupiter.api.Test;
 +
 +import com.google.common.base.Throwables;
 +
 +public abstract class FateStoreIT extends SharedMiniClusterBase implements 
FateTestRunner<TestEnv> {
 +
 +  private static final Method fsCreateByKeyMethod;
 +
 +  static {
 +    try {
 +      // Private method, need to capture for testing
 +      fsCreateByKeyMethod = 
AbstractFateStore.class.getDeclaredMethod("create", FateKey.class);
 +      fsCreateByKeyMethod.setAccessible(true);
 +    } catch (NoSuchMethodException e) {
 +      throw new RuntimeException(e);
 +    }
 +  }
 +
 +  @Override
 +  protected Duration defaultTimeout() {
 +    return Duration.ofMinutes(1);
 +  }
 +
 +  @Test
 +  public void testReadWrite() throws Exception {
 +    executeTest(this::testReadWrite);
 +  }
 +
 +  protected void testReadWrite(FateStore<TestEnv> store, ServerContext sctx)
 +      throws StackOverflowException {
 +    // Verify no transactions
 +    assertEquals(0, store.list().count());
 +
 +    // Create a new transaction and get the store for it
 +    FateId fateId = store.create();
 +    FateTxStore<TestEnv> txStore = store.reserve(fateId);
 +    assertTrue(txStore.timeCreated() > 0);
 +    assertFalse(txStore.getKey().isPresent());
 +    assertEquals(1, store.list().count());
 +
 +    // Push a test FATE op and verify we can read it back
 +    txStore.setStatus(TStatus.IN_PROGRESS);
 +    txStore.push(new TestRepo("testOp"));
 +    TestRepo op = (TestRepo) txStore.top();
 +    assertNotNull(op);
 +
 +    // Test status
 +    txStore.setStatus(TStatus.SUBMITTED);
 +    assertEquals(TStatus.SUBMITTED, txStore.getStatus());
 +
 +    // Set a name to test setTransactionInfo()
 +    txStore.setTransactionInfo(TxInfo.TX_NAME, "name");
 +    assertEquals("name", txStore.getTransactionInfo(TxInfo.TX_NAME));
 +
 +    // Try setting a second test op to test getStack()
 +    // when listing or popping TestOperation2 should be first
 +    assertEquals(1, txStore.getStack().size());
 +    txStore.setStatus(TStatus.IN_PROGRESS);
 +    txStore.push(new TestOperation2());
 +    // test top returns TestOperation2
 +    ReadOnlyRepo<TestEnv> top = txStore.top();
 +    assertInstanceOf(TestOperation2.class, top);
 +
 +    // test get stack
 +    List<ReadOnlyRepo<TestEnv>> ops = txStore.getStack();
 +    assertEquals(2, ops.size());
 +    assertInstanceOf(TestOperation2.class, ops.get(0));
 +    assertEquals(TestRepo.class, ops.get(1).getClass());
 +
 +    // test pop, TestOperation should be left
 +    txStore.setStatus(TStatus.FAILED_IN_PROGRESS); // needed to satisfy the 
condition on pop
 +    txStore.pop();
 +    ops = txStore.getStack();
 +    assertEquals(1, ops.size());
 +    assertEquals(TestRepo.class, ops.get(0).getClass());
 +
 +    // create second
 +    FateTxStore<TestEnv> txStore2 = store.reserve(store.create());
 +    assertEquals(2, store.list().count());
 +
 +    // test delete
 +    txStore.setStatus(TStatus.SUCCESSFUL); // needed to satisfy the condition 
on delete
 +    txStore.delete();
 +    assertEquals(1, store.list().count());
 +    txStore2.setStatus(TStatus.SUCCESSFUL); // needed to satisfy the 
condition on delete
 +    txStore2.delete();
 +    assertEquals(0, store.list().count());
 +  }
 +
 +  @Test
 +  public void testReadWriteTxInfo() throws Exception {
 +    executeTest(this::testReadWriteTxInfo);
 +  }
 +
 +  protected void testReadWriteTxInfo(FateStore<TestEnv> store, ServerContext 
sctx) {
 +    FateId fateId = store.create();
 +    FateTxStore<TestEnv> txStore = store.reserve(fateId);
 +
 +    try {
 +      // Go through all enum values to verify each TxInfo type will be 
properly
 +      // written and read from the store
 +      for (TxInfo txInfo : TxInfo.values()) {
 +        assertNull(txStore.getTransactionInfo(txInfo));
 +        txStore.setTransactionInfo(txInfo, "value: " + txInfo.name());
 +        assertEquals("value: " + txInfo.name(), 
txStore.getTransactionInfo(txInfo));
 +      }
 +    } finally {
 +      txStore.delete();
 +    }
 +
 +  }
 +
 +  @Test
 +  public void testDeferredOverflow() throws Exception {
 +    executeTest(this::testDeferredOverflow, 10, 
AbstractFateStore.DEFAULT_FATE_ID_GENERATOR);
 +  }
 +
 +  protected void testDeferredOverflow(FateStore<TestEnv> store, ServerContext 
sctx)
 +      throws Exception {
 +    // Verify no transactions
 +    assertEquals(0, store.list().count());
 +    assertFalse(store.isDeferredOverflow());
 +
 +    // Store 10 transactions that are all deferred
 +    final Set<FateId> transactions = new HashSet<>();
 +    for (int i = 0; i < 10; i++) {
 +      FateId fateId = store.create();
 +      transactions.add(fateId);
 +      FateTxStore<TestEnv> txStore = store.reserve(fateId);
 +      txStore.setStatus(TStatus.SUBMITTED);
 +      assertTrue(txStore.timeCreated() > 0);
-       txStore.unreserve(10, TimeUnit.SECONDS);
++      txStore.unreserve(Duration.ofSeconds(10));
 +    }
 +
 +    // Verify we have 10 transactions and all are deferred
 +    assertEquals(10, store.list().count());
 +    assertEquals(10, store.getDeferredCount());
 +
 +    // Should still be false as we are at thet max but not over yet
 +    assertFalse(store.isDeferredOverflow());
 +
 +    var executor = Executors.newCachedThreadPool();
 +    Future<?> future;
 +    AtomicBoolean keepRunning = new AtomicBoolean(true);
 +    try {
 +      // Run and verify all 10 transactions still exist and were not
 +      // run because of the deferral time of all the transactions
 +      future = executor.submit(() -> store.runnable(keepRunning, 
transactions::remove));
 +      Thread.sleep(2000);
 +      assertEquals(10, transactions.size());
 +      // Setting this flag to false should terminate the task if sleeping
 +      keepRunning.set(false);
 +      // wait for the future to finish to verify the task finished
 +      future.get();
 +
 +      // Store one more that should go over the max deferred of 10
 +      // and should clear the map and set the overflow flag
 +      FateId fateId = store.create();
 +      transactions.add(fateId);
 +      FateTxStore<TestEnv> txStore = store.reserve(fateId);
 +      txStore.setStatus(TStatus.SUBMITTED);
-       txStore.unreserve(30, TimeUnit.SECONDS);
++      txStore.unreserve(Duration.ofSeconds(30));
 +
 +      // Verify we have 11 transactions stored and none
 +      // deferred anymore because of the overflow
 +      assertEquals(11, store.list().count());
 +      assertEquals(0, store.getDeferredCount());
 +      assertTrue(store.isDeferredOverflow());
 +
 +      // Run and verify all 11 transactions were processed
 +      // and removed from the store
 +      keepRunning.set(true);
 +      future = executor.submit(() -> store.runnable(keepRunning, 
transactions::remove));
 +      Wait.waitFor(transactions::isEmpty);
 +      // Setting this flag to false should terminate the task if sleeping
 +      keepRunning.set(false);
 +      // wait for the future to finish to verify the task finished
 +      future.get();
 +
 +      // Overflow should now be reset to false so adding another deferred
 +      // transaction should now go back into the deferral map and flag should
 +      // still be false as we are under the limit
 +      assertFalse(store.isDeferredOverflow());
 +      txStore = store.reserve(store.create());
-       txStore.unreserve(30, TimeUnit.SECONDS);
++      txStore.unreserve(Duration.ofSeconds(30));
 +      assertEquals(1, store.getDeferredCount());
 +      assertFalse(store.isDeferredOverflow());
 +    } finally {
 +      executor.shutdownNow();
 +      // Cleanup so we don't interfere with other tests
 +      // All stores should already be unreserved
 +      store.list().forEach(
 +          fateIdStatus -> 
store.tryReserve(fateIdStatus.getFateId()).orElseThrow().delete());
 +    }
 +  }
 +
 +  @Test
 +  public void testCreateWithKey() throws Exception {
 +    executeTest(this::testCreateWithKey);
 +  }
 +
 +  protected void testCreateWithKey(FateStore<TestEnv> store, ServerContext 
sctx) {
 +    KeyExtent ke1 =
 +        new KeyExtent(TableId.of(getUniqueNames(1)[0]), new Text("zzz"), new 
Text("aaa"));
 +
 +    long existing = store.list().count();
 +    FateKey fateKey1 = FateKey.forSplit(ke1);
 +    FateKey fateKey2 =
 +        
FateKey.forCompactionCommit(ExternalCompactionId.generate(UUID.randomUUID()));
 +
 +    FateTxStore<TestEnv> txStore1 = 
store.createAndReserve(fateKey1).orElseThrow();
 +    FateTxStore<TestEnv> txStore2 = 
store.createAndReserve(fateKey2).orElseThrow();
 +
 +    assertNotEquals(txStore1.getID(), txStore2.getID());
 +
 +    try {
 +      assertTrue(txStore1.timeCreated() > 0);
 +      assertEquals(TStatus.NEW, txStore1.getStatus());
 +      assertEquals(fateKey1, txStore1.getKey().orElseThrow());
 +
 +      assertTrue(txStore2.timeCreated() > 0);
 +      assertEquals(TStatus.NEW, txStore2.getStatus());
 +      assertEquals(fateKey2, txStore2.getKey().orElseThrow());
 +
 +      assertEquals(existing + 2, store.list().count());
 +    } finally {
 +      txStore1.delete();
 +      txStore2.delete();
-       txStore1.unreserve(0, TimeUnit.SECONDS);
-       txStore2.unreserve(0, TimeUnit.SECONDS);
++      txStore1.unreserve(Duration.ZERO);
++      txStore2.unreserve(Duration.ZERO);
 +    }
 +  }
 +
 +  @Test
 +  public void testCreateWithKeyDuplicate() throws Exception {
 +    executeTest(this::testCreateWithKeyDuplicate);
 +  }
 +
 +  protected void testCreateWithKeyDuplicate(FateStore<TestEnv> store, 
ServerContext sctx) {
 +    KeyExtent ke =
 +        new KeyExtent(TableId.of(getUniqueNames(1)[0]), new Text("zzz"), new 
Text("aaa"));
 +
 +    // Creating with the same key should be fine if the status is NEW
 +    // A second call to createAndReserve() should just return an empty 
optional
 +    // since it's already in reserved and in progress
 +    FateKey fateKey = FateKey.forSplit(ke);
 +    FateTxStore<TestEnv> txStore = 
store.createAndReserve(fateKey).orElseThrow();
 +
 +    // second call is empty
 +    assertTrue(store.createAndReserve(fateKey).isEmpty());
 +
 +    try {
 +      assertTrue(txStore.timeCreated() > 0);
 +      assertEquals(TStatus.NEW, txStore.getStatus());
 +      assertEquals(fateKey, txStore.getKey().orElseThrow());
 +      assertEquals(1, store.list().count());
 +    } finally {
 +      txStore.delete();
-       txStore.unreserve(0, TimeUnit.SECONDS);
++      txStore.unreserve(Duration.ZERO);
 +    }
 +  }
 +
 +  @Test
 +  public void testCreateWithKeyInProgress() throws Exception {
 +    executeTest(this::testCreateWithKeyInProgress);
 +  }
 +
 +  protected void testCreateWithKeyInProgress(FateStore<TestEnv> store, 
ServerContext sctx)
 +      throws Exception {
 +    KeyExtent ke =
 +        new KeyExtent(TableId.of(getUniqueNames(1)[0]), new Text("zzz"), new 
Text("aaa"));
 +    FateKey fateKey = FateKey.forSplit(ke);
 +
 +    FateTxStore<TestEnv> txStore = 
store.createAndReserve(fateKey).orElseThrow();
 +
 +    try {
 +      assertTrue(txStore.timeCreated() > 0);
 +      txStore.setStatus(TStatus.IN_PROGRESS);
 +
 +      // We have an existing transaction with the same key in progress
 +      // so should return an empty Optional
 +      assertTrue(create(store, fateKey).isEmpty());
 +      assertEquals(TStatus.IN_PROGRESS, txStore.getStatus());
 +    } finally {
 +      txStore.setStatus(TStatus.SUCCESSFUL);
 +      txStore.delete();
-       txStore.unreserve(0, TimeUnit.SECONDS);
++      txStore.unreserve(Duration.ZERO);
 +    }
 +
 +    try {
 +      // After deletion, make sure we can create again with the same key
 +      txStore = store.createAndReserve(fateKey).orElseThrow();
 +      assertTrue(txStore.timeCreated() > 0);
 +      assertEquals(TStatus.NEW, txStore.getStatus());
 +    } finally {
 +      txStore.delete();
-       txStore.unreserve(0, TimeUnit.SECONDS);
++      txStore.unreserve(Duration.ZERO);
 +    }
 +
 +  }
 +
 +  @Test
 +  public void testCreateWithKeyCollision() throws Exception {
 +    // Replace the default hasing algorithm with one that always returns the 
same tid so
 +    // we can check duplicate detection with different keys
 +    executeTest(this::testCreateWithKeyCollision, 
AbstractFateStore.DEFAULT_MAX_DEFERRED,
 +        (instanceType, fateKey) -> FateId.from(instanceType, 1000));
 +  }
 +
 +  protected void testCreateWithKeyCollision(FateStore<TestEnv> store, 
ServerContext sctx) {
 +    String[] tables = getUniqueNames(2);
 +    KeyExtent ke1 = new KeyExtent(TableId.of(tables[0]), new Text("zzz"), new 
Text("aaa"));
 +    KeyExtent ke2 = new KeyExtent(TableId.of(tables[1]), new Text("ddd"), new 
Text("bbb"));
 +
 +    FateKey fateKey1 = FateKey.forSplit(ke1);
 +    FateKey fateKey2 = FateKey.forSplit(ke2);
 +
 +    FateTxStore<TestEnv> txStore = 
store.createAndReserve(fateKey1).orElseThrow();
 +    try {
 +      var e = assertThrows(IllegalStateException.class, () -> create(store, 
fateKey2));
 +      assertEquals("Collision detected for tid 1000", e.getMessage());
 +      assertEquals(fateKey1, txStore.getKey().orElseThrow());
 +    } finally {
 +      txStore.delete();
-       txStore.unreserve(0, TimeUnit.SECONDS);
++      txStore.unreserve(Duration.ZERO);
 +    }
 +
 +  }
 +
 +  @Test
 +  public void testCollisionWithRandomFateId() throws Exception {
 +    executeTest(this::testCollisionWithRandomFateId);
 +  }
 +
 +  protected void testCollisionWithRandomFateId(FateStore<TestEnv> store, 
ServerContext sctx)
 +      throws Exception {
 +    KeyExtent ke =
 +        new KeyExtent(TableId.of(getUniqueNames(1)[0]), new Text("zzz"), new 
Text("aaa"));
 +
 +    FateKey fateKey = FateKey.forSplit(ke);
 +    FateId fateId = create(store, fateKey).orElseThrow();
 +
 +    // After create a fate transaction using a key we can simulate a 
collision with
 +    // a random FateId by deleting the key out of Fate and calling create 
again to verify
 +    // it detects the key is missing. Then we can continue and see if we can 
still reserve
 +    // and use the existing transaction, which we should.
 +    deleteKey(fateId, sctx);
 +    var e = assertThrows(IllegalStateException.class, () -> 
store.createAndReserve(fateKey));
 +    assertEquals("Tx Key is missing from tid " + fateId.getTid(), 
e.getMessage());
 +
 +    // We should still be able to reserve and continue when not using a key
 +    // just like a normal transaction
 +    FateTxStore<TestEnv> txStore = store.reserve(fateId);
 +    try {
 +      assertTrue(txStore.timeCreated() > 0);
 +      assertEquals(TStatus.NEW, txStore.getStatus());
 +    } finally {
 +      txStore.delete();
-       txStore.unreserve(0, TimeUnit.SECONDS);
++      txStore.unreserve(Duration.ZERO);
 +    }
 +
 +  }
 +
 +  @Test
 +  public void testListFateKeys() throws Exception {
 +    executeTest(this::testListFateKeys);
 +  }
 +
 +  protected void testListFateKeys(FateStore<TestEnv> store, ServerContext 
sctx) throws Exception {
 +
 +    // this should not be seen when listing by key type because it has no key
 +    var id1 = store.create();
 +
 +    TableId tid1 = TableId.of("test");
 +    var extent1 = new KeyExtent(tid1, new Text("m"), null);
 +    var extent2 = new KeyExtent(tid1, null, new Text("m"));
 +    var fateKey1 = FateKey.forSplit(extent1);
 +    var fateKey2 = FateKey.forSplit(extent2);
 +
 +    var cid1 = ExternalCompactionId.generate(UUID.randomUUID());
 +    var cid2 = ExternalCompactionId.generate(UUID.randomUUID());
 +
 +    assertNotEquals(cid1, cid2);
 +
 +    var fateKey3 = FateKey.forCompactionCommit(cid1);
 +    var fateKey4 = FateKey.forCompactionCommit(cid2);
 +
 +    Map<FateKey,FateId> fateKeyIds = new HashMap<>();
 +    for (FateKey fateKey : List.of(fateKey1, fateKey2, fateKey3, fateKey4)) {
 +      var fateTx = store.createAndReserve(fateKey).orElseThrow();
 +      fateKeyIds.put(fateKey, fateTx.getID());
-       fateTx.unreserve(0, TimeUnit.MILLISECONDS);
++      fateTx.unreserve(Duration.ZERO);
 +    }
 +
 +    HashSet<FateId> allIds = new HashSet<>();
 +    allIds.addAll(fateKeyIds.values());
 +    allIds.add(id1);
 +    assertEquals(allIds, 
store.list().map(FateIdStatus::getFateId).collect(Collectors.toSet()));
 +    assertEquals(5, allIds.size());
 +
 +    assertEquals(4, fateKeyIds.size());
 +    assertEquals(4, fateKeyIds.values().stream().distinct().count());
 +
 +    HashSet<KeyExtent> seenExtents = new HashSet<>();
 +    store.list(FateKey.FateKeyType.SPLIT).forEach(fateKey -> {
 +      assertEquals(FateKey.FateKeyType.SPLIT, fateKey.getType());
 +      assertNotNull(fateKeyIds.remove(fateKey));
 +      assertTrue(seenExtents.add(fateKey.getKeyExtent().orElseThrow()));
 +    });
 +
 +    assertEquals(2, fateKeyIds.size());
 +    assertEquals(Set.of(extent1, extent2), seenExtents);
 +
 +    HashSet<ExternalCompactionId> seenCids = new HashSet<>();
 +    store.list(FateKey.FateKeyType.COMPACTION_COMMIT).forEach(fateKey -> {
 +      assertEquals(FateKey.FateKeyType.COMPACTION_COMMIT, fateKey.getType());
 +      assertNotNull(fateKeyIds.remove(fateKey));
 +      assertTrue(seenCids.add(fateKey.getCompactionId().orElseThrow()));
 +    });
 +
 +    assertEquals(0, fateKeyIds.size());
 +    assertEquals(Set.of(cid1, cid2), seenCids);
 +  }
 +
 +  // create(fateKey) method is private so expose for testing to check error 
states
 +  @SuppressWarnings("unchecked")
 +  protected Optional<FateId> create(FateStore<TestEnv> store, FateKey 
fateKey) throws Exception {
 +    try {
 +      return (Optional<FateId>) fsCreateByKeyMethod.invoke(store, fateKey);
 +    } catch (Exception e) {
 +      Exception rootCause = (Exception) Throwables.getRootCause(e);
 +      throw rootCause;
 +    }
 +  }
 +
 +  protected abstract void deleteKey(FateId fateId, ServerContext sctx);
 +
 +  private static class TestOperation2 extends TestRepo {
 +
 +    private static final long serialVersionUID = 1L;
 +
 +    public TestOperation2() {
 +      super("testOperation2");
 +    }
 +  }
 +
 +}


Reply via email to