keith-turner commented on code in PR #4524:
URL: https://github.com/apache/accumulo/pull/4524#discussion_r1631616426


##########
core/src/main/java/org/apache/accumulo/core/fate/MetaFateStore.java:
##########
@@ -315,6 +420,30 @@ public List<ReadOnlyRepo<T>> getStack() {
         return dops;
       }
     }
+
+    @Override
+    protected void unreserve() {
+      try {
+        if (!this.deleted) {
+          zk.mutateExisting(getTXPath(fateId), currSerNodeVal -> {
+            NodeValue currNodeVal = new NodeValue(currSerNodeVal);
+            FateKey currFateKey = currNodeVal.fateKey.orElse(null);
+            if ((currNodeVal.isReserved()
+                && 
currNodeVal.reservation.orElseThrow().equals(this.reservation))) {
+              // Remove the FateReservation from the NodeValue to unreserve
+              return new NodeValue(currNodeVal.status, null, 
currFateKey).serialize();
+            } else {
+              // possible this is running a 2nd time in zk server fault 
conditions and its first

Review Comment:
   Could add some logging here.  If the reservation in ZK is not set, then 
could log something at trace.  If there is a reservation in ZK that differs, it 
could be  log something at debug.  These may be useful later for debugging 
problems when initially using the elasticity changes.



##########
core/src/main/java/org/apache/accumulo/core/fate/MetaFateStore.java:
##########
@@ -421,14 +548,30 @@ private Optional<FateKey> 
deserializeFateKey(DataInputBuffer buffer) throws IOEx
       return Optional.empty();
     }
 
+    private Optional<FateReservation> 
deserializeFateReservation(DataInputBuffer buffer)
+        throws IOException {
+      int length = buffer.readInt();
+      if (length > 0) {
+        return 
Optional.of(FateReservation.deserialize(buffer.readNBytes(length)));
+      }
+      return Optional.empty();
+    }
+
     byte[] serialize() {
       try (ByteArrayOutputStream baos = new ByteArrayOutputStream();
           DataOutputStream dos = new DataOutputStream(baos)) {
         dos.writeUTF(status.name());
+        if (isReserved()) {

Review Comment:
   Not a change to make in this PR. Thinking if we push all of the Fate data 
into a single node it would be nice to use JSON.  That would make the data 
stored in ZK more human readable and it would make it easier to serialize and 
de-serialize.



##########
core/src/main/java/org/apache/accumulo/core/fate/MetaFateStore.java:
##########
@@ -224,9 +323,14 @@ public void setStatus(TStatus status) {
       verifyReserved(true);
 
       try {
-        zk.putPersistentData(getTXPath(fateId), new 
NodeValue(status).serialize(),
-            NodeExistsPolicy.OVERWRITE);
-      } catch (KeeperException | InterruptedException e) {
+        zk.mutateExisting(getTXPath(fateId), currSerializedData -> {
+          NodeValue currNodeVal = new NodeValue(currSerializedData);

Review Comment:
   Somewhere in this lamba it should check a reservation exist for the ZK node 
and that the reservation on the ZK node is equal to 
AbstractFateTxStoreImpl.reserveration, if those conditions are not met then 
throw an exception.   This check ensures that the expected state we have in 
memory aligns with what is in the persistent store.
   
   Ideally these kinds of checks would be done for all updates, however it is 
not possible to atomically do these checks for data stored in other ZK nodes.  
The way FATE stores data in ZK it uses multiple nodes.  The schema could be 
changed to store all data in a single node which would allow all updates to 
verify the reservation atomically, that is not a change for this PR.  Would be 
a follow on issue.



##########
core/src/main/java/org/apache/accumulo/core/fate/MetaFateStore.java:
##########
@@ -106,13 +112,102 @@ public FateId create() {
   @Override
   protected void create(FateId fateId, FateKey key) {
     try {
-      zk.putPersistentData(getTXPath(fateId), new NodeValue(TStatus.NEW, 
key).serialize(),
+      zk.putPersistentData(getTXPath(fateId), new NodeValue(TStatus.NEW, null, 
key).serialize(),
           NodeExistsPolicy.FAIL);
     } catch (KeeperException | InterruptedException e) {
       throw new IllegalStateException(e);
     }
   }
 
+  @Override
+  public Optional<FateTxStore<T>> tryReserve(FateId fateId) {
+    // uniquely identify this attempt to reserve the fate operation data
+    FateReservation reservation = FateReservation.from(lockID, 
UUID.randomUUID());
+
+    try {
+      byte[] newSerNodeVal = zk.mutateExisting(getTXPath(fateId), 
currSerNodeVal -> {
+        NodeValue currNodeVal = new NodeValue(currSerNodeVal);
+        // The uuid handles the case where there was a ZK server fault and the 
write for this thread
+        // went through but that was not acknowledged, and we are reading our 
own write for 2nd
+        // time.
+        if (!currNodeVal.isReserved() || (currNodeVal.isReserved()
+            && currNodeVal.reservation.orElseThrow().equals(reservation))) {
+          FateKey currFateKey = currNodeVal.fateKey.orElse(null);
+          // Add the FateReservation to the node to reserve
+          return new NodeValue(currNodeVal.status, reservation, 
currFateKey).serialize();
+        } else {
+          // This will not change the value to null but will return null
+          return null;
+        }
+      });
+      if (newSerNodeVal != null) {
+        return Optional.of(new FateTxStoreImpl(fateId, reservation));
+      } else {
+        return Optional.empty();
+      }
+    } catch (InterruptedException | KeeperException | 
AcceptableThriftTableOperationException e) {
+      throw new IllegalStateException(e);
+    }
+  }
+
+  @Override
+  public boolean isReserved(FateId fateId) {
+    boolean isReserved;
+    try {
+      isReserved = getNode(fateId).isReserved();
+    } catch (Exception e) {
+      // Exception thrown, so node doesn't exist, so it is not reserved
+      isReserved = false;
+    }

Review Comment:
   If possible this catch should be more narrow.  Only want to ignore 
exceptions that would be expected for the case of it not existing. Other 
exceptions should bubble up so bugs can be detected.  Also nice to log 
something when ignoring an exception.
   
   ```suggestion
      } catch (NoNodeException e) {
         log.trace("Saw {} when checking reservation of {}", e.getMessage(), 
fateId);   
         isReserved = false;
       } catch (Exception e) {
         throw new RuntimeException(e);
       }
   ```



##########
test/src/main/java/org/apache/accumulo/test/fate/MultipleStoresIT.java:
##########
@@ -0,0 +1,571 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   https://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.accumulo.test.fate;
+
+import static 
org.apache.accumulo.test.fate.user.UserFateStoreIT.createFateTable;
+import static org.easymock.EasyMock.anyObject;
+import static org.easymock.EasyMock.expect;
+import static org.easymock.EasyMock.replay;
+import static org.junit.jupiter.api.Assertions.assertEquals;
+import static org.junit.jupiter.api.Assertions.assertFalse;
+import static org.junit.jupiter.api.Assertions.assertThrows;
+import static org.junit.jupiter.api.Assertions.assertTrue;
+
+import java.io.File;
+import java.time.Duration;
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import java.util.UUID;
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicInteger;
+
+import org.apache.accumulo.core.client.Accumulo;
+import org.apache.accumulo.core.clientImpl.ClientContext;
+import org.apache.accumulo.core.conf.DefaultConfiguration;
+import org.apache.accumulo.core.conf.Property;
+import org.apache.accumulo.core.fate.Fate;
+import org.apache.accumulo.core.fate.FateId;
+import org.apache.accumulo.core.fate.FateInstanceType;
+import org.apache.accumulo.core.fate.FateStore;
+import org.apache.accumulo.core.fate.MetaFateStore;
+import org.apache.accumulo.core.fate.ReadOnlyFateStore;
+import org.apache.accumulo.core.fate.Repo;
+import org.apache.accumulo.core.fate.user.UserFateStore;
+import org.apache.accumulo.core.fate.zookeeper.ZooCache;
+import org.apache.accumulo.core.fate.zookeeper.ZooReaderWriter;
+import org.apache.accumulo.core.fate.zookeeper.ZooUtil;
+import org.apache.accumulo.harness.SharedMiniClusterBase;
+import org.apache.accumulo.test.util.Wait;
+import org.apache.accumulo.test.zookeeper.ZooKeeperTestingServer;
+import org.easymock.EasyMock;
+import org.junit.jupiter.api.AfterAll;
+import org.junit.jupiter.api.AfterEach;
+import org.junit.jupiter.api.BeforeAll;
+import org.junit.jupiter.api.BeforeEach;
+import org.junit.jupiter.api.Test;
+import org.junit.jupiter.api.io.TempDir;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import com.google.common.collect.Sets;
+
+// TODO 4131 could potentially have separate classes for testing MetaFateStore 
and UserFateStore
+// similar to how FateTestRunner is used, however that interface doesn't work 
as nicely here
+// since we are using multiple stores instead of just one. Can do something 
similar to
+// FateTestRunner here if desired
+public class MultipleStoresIT extends SharedMiniClusterBase {
+
+  private static final Logger LOG = 
LoggerFactory.getLogger(MultipleStoresIT.class);
+  @TempDir
+  private static File tempDir;
+  private static ZooKeeperTestingServer szk = null;
+  private static ZooReaderWriter zk;
+  private static final String FATE_DIR = "/fate";
+  private ClientContext client;
+
+  @BeforeEach
+  public void beforeEachSetup() {
+    client = (ClientContext) 
Accumulo.newClient().from(getClientProps()).build();
+  }
+
+  @AfterEach
+  public void afterEachTeardown() {
+    client.close();
+  }
+
+  @BeforeAll
+  public static void beforeAllSetup() throws Exception {
+    SharedMiniClusterBase.startMiniCluster();
+    szk = new ZooKeeperTestingServer(tempDir);
+    zk = szk.getZooReaderWriter();
+  }
+
+  @AfterAll
+  public static void afterAllTeardown() throws Exception {
+    SharedMiniClusterBase.stopMiniCluster();
+    szk.close();
+  }
+
+  @Test
+  public void testReserveUnreserve() throws Exception {
+    testReserveUnreserve(FateInstanceType.META);
+    testReserveUnreserve(FateInstanceType.USER);
+  }
+
+  private void testReserveUnreserve(FateInstanceType storeType) throws 
Exception {
+    // reserving/unreserving a FateId should be reflected across instances of 
the stores
+    final String tableName = getUniqueNames(1)[0];
+    final int numFateIds = 500;
+    final List<FateStore.FateTxStore<SleepingTestEnv>> reservations = new 
ArrayList<>();
+    final boolean isUserStore = storeType.equals(FateInstanceType.USER);
+    final Set<FateId> allIds = new HashSet<>();
+    final FateStore<SleepingTestEnv> store1, store2;
+    final ZooUtil.LockID lock1 = new ZooUtil.LockID("/locks", "L1", 50);
+    final ZooUtil.LockID lock2 = new ZooUtil.LockID("/locks", "L2", 52);
+    Map<FateId,FateStore.FateReservation> activeReservations;
+
+    if (isUserStore) {
+      createFateTable(client, tableName);
+      store1 = new UserFateStore<>(client, tableName, lock1);
+      store2 = new UserFateStore<>(client, tableName, lock2);
+    } else {
+      store1 = new MetaFateStore<>(FATE_DIR, zk, client.getZooCache(), lock1);
+      store2 = new MetaFateStore<>(FATE_DIR, zk, client.getZooCache(), lock2);
+    }
+
+    // Create the fate ids using store1
+    for (int i = 0; i < numFateIds; i++) {
+      assertTrue(allIds.add(store1.create()));
+    }
+    assertEquals(numFateIds, allIds.size());
+
+    // Reserve half the fate ids using store1 and rest using store2, after 
reserving a fate id in
+    // one, should not be able to reserve the same in the other. Should also 
not matter that all the
+    // ids were created using store1
+    int count = 0;
+    for (FateId fateId : allIds) {
+      if (count % 2 == 0) {
+        reservations.add(store1.reserve(fateId));
+        assertTrue(store2.tryReserve(fateId).isEmpty());
+      } else {
+        reservations.add(store2.reserve(fateId));
+        assertTrue(store1.tryReserve(fateId).isEmpty());
+      }
+      count++;
+    }
+    // Both stores should return the same reserved transactions
+    activeReservations = store1.getActiveReservations();
+    assertEquals(allIds, activeReservations.keySet());
+    activeReservations = store2.getActiveReservations();
+    assertEquals(allIds, activeReservations.keySet());
+
+    // Test setting/getting the TStatus and unreserving the transactions
+    for (int i = 0; i < allIds.size(); i++) {
+      var reservation = reservations.get(i);
+      assertEquals(ReadOnlyFateStore.TStatus.NEW, reservation.getStatus());
+      reservation.setStatus(ReadOnlyFateStore.TStatus.SUBMITTED);
+      assertEquals(ReadOnlyFateStore.TStatus.SUBMITTED, 
reservation.getStatus());
+      reservation.delete();
+      reservation.unreserve(Duration.ofMillis(0));
+      // Attempt to set a status on a tx that has been unreserved (should 
throw exception)
+      assertThrows(IllegalStateException.class,
+          () -> reservation.setStatus(ReadOnlyFateStore.TStatus.NEW));
+    }
+    assertTrue(store1.getActiveReservations().isEmpty());
+    assertTrue(store2.getActiveReservations().isEmpty());
+  }
+
+  @Test
+  public void testReserveNonExistentTxn() throws Exception {
+    testReserveNonExistentTxn(FateInstanceType.META);
+    testReserveNonExistentTxn(FateInstanceType.USER);
+  }
+
+  private void testReserveNonExistentTxn(FateInstanceType storeType) throws 
Exception {
+    // Tests that reserve() doesn't hang indefinitely and instead throws an 
error
+    // on reserve() a non-existent transaction.
+    final FateStore<SleepingTestEnv> store;
+    final boolean isUserStore = storeType.equals(FateInstanceType.USER);
+    final String tableName = getUniqueNames(1)[0];
+    final FateId fakeFateId = FateId.from(storeType, UUID.randomUUID());
+    final ZooUtil.LockID lock = new ZooUtil.LockID("/locks", "L1", 50);
+
+    if (isUserStore) {
+      createFateTable(client, tableName);
+      store = new UserFateStore<>(client, tableName, lock);
+    } else {
+      store = new MetaFateStore<>(FATE_DIR, zk, client.getZooCache(), lock);
+    }
+
+    assertThrows(IllegalStateException.class, () -> store.reserve(fakeFateId));
+  }
+
+  @Test
+  public void testReserveReservedAndUnreserveUnreserved() throws Exception {
+    testReserveReservedAndUnreserveUnreserved(FateInstanceType.META);
+    testReserveReservedAndUnreserveUnreserved(FateInstanceType.USER);
+  }
+
+  private void testReserveReservedAndUnreserveUnreserved(FateInstanceType 
storeType)
+      throws Exception {
+    final String tableName = getUniqueNames(1)[0];
+    final int numFateIds = 500;
+    final boolean isUserStore = storeType.equals(FateInstanceType.USER);
+    final Set<FateId> allIds = new HashSet<>();
+    final List<FateStore.FateTxStore<SleepingTestEnv>> reservations = new 
ArrayList<>();
+    final FateStore<SleepingTestEnv> store;
+    final ZooUtil.LockID lock = new ZooUtil.LockID("/locks", "L1", 50);
+
+    if (isUserStore) {
+      createFateTable(client, tableName);
+      store = new UserFateStore<>(client, tableName, lock);
+    } else {
+      store = new MetaFateStore<>(FATE_DIR, zk, client.getZooCache(), lock);
+    }
+
+    // Create some FateIds and ensure that they can be reserved
+    for (int i = 0; i < numFateIds; i++) {
+      FateId fateId = store.create();
+      assertTrue(allIds.add(fateId));
+      var reservation = store.tryReserve(fateId);
+      assertFalse(reservation.isEmpty());
+      reservations.add(reservation.orElseThrow());
+    }
+    assertEquals(numFateIds, allIds.size());
+
+    // Try to reserve again, should not reserve
+    for (FateId fateId : allIds) {
+      assertTrue(store.tryReserve(fateId).isEmpty());
+    }
+
+    // Unreserve all the FateIds
+    for (var reservation : reservations) {
+      reservation.unreserve(Duration.ofMillis(0));
+    }
+    // Try to unreserve again (should throw exception)
+    for (var reservation : reservations) {
+      assertThrows(IllegalStateException.class, () -> 
reservation.unreserve(Duration.ofMillis(0)));
+    }
+  }
+
+  @Test
+  public void testReserveAfterUnreserveAndReserveAfterDeleted() throws 
Exception {
+    testReserveAfterUnreserveAndReserveAfterDeleted(FateInstanceType.META);
+    testReserveAfterUnreserveAndReserveAfterDeleted(FateInstanceType.USER);
+  }
+
+  private void 
testReserveAfterUnreserveAndReserveAfterDeleted(FateInstanceType storeType)
+      throws Exception {
+    final String tableName = getUniqueNames(1)[0];
+    final int numFateIds = 500;
+    final boolean isUserStore = storeType.equals(FateInstanceType.USER);
+    final Set<FateId> allIds = new HashSet<>();
+    final List<FateStore.FateTxStore<SleepingTestEnv>> reservations = new 
ArrayList<>();
+    final FateStore<SleepingTestEnv> store;
+    final ZooUtil.LockID lock = new ZooUtil.LockID("/locks", "L1", 50);
+
+    if (isUserStore) {
+      createFateTable(client, tableName);
+      store = new UserFateStore<>(client, tableName, lock);
+    } else {
+      store = new MetaFateStore<>(FATE_DIR, zk, client.getZooCache(), lock);
+    }
+
+    // Create some FateIds and ensure that they can be reserved
+    for (int i = 0; i < numFateIds; i++) {
+      FateId fateId = store.create();
+      assertTrue(allIds.add(fateId));
+      var reservation = store.tryReserve(fateId);
+      assertFalse(reservation.isEmpty());
+      reservations.add(reservation.orElseThrow());
+    }
+    assertEquals(numFateIds, allIds.size());
+
+    // Unreserve all
+    for (var reservation : reservations) {
+      reservation.unreserve(Duration.ofMillis(0));
+    }
+
+    // Ensure they can be reserved again, and delete and unreserve this time
+    for (FateId fateId : allIds) {
+      // Verify that the tx status is still NEW after unreserving since it 
hasn't been deleted
+      assertEquals(ReadOnlyFateStore.TStatus.NEW, 
store.read(fateId).getStatus());
+      var reservation = store.tryReserve(fateId);
+      assertFalse(reservation.isEmpty());
+      reservation.orElseThrow().delete();
+      reservation.orElseThrow().unreserve(Duration.ofMillis(0));
+    }
+
+    for (FateId fateId : allIds) {
+      // Verify that the tx is now unknown since it has been deleted
+      assertEquals(ReadOnlyFateStore.TStatus.UNKNOWN, 
store.read(fateId).getStatus());
+      // Attempt to reserve a deleted txn, should throw an exception and not 
wait indefinitely
+      assertThrows(IllegalStateException.class, () -> store.reserve(fateId));
+    }
+  }
+
+  @Test
+  public void testMultipleFateInstances() throws Exception {
+    testMultipleFateInstances(FateInstanceType.META);
+    testMultipleFateInstances(FateInstanceType.USER);
+  }
+
+  private void testMultipleFateInstances(FateInstanceType storeType) throws 
Exception {
+    final String tableName = getUniqueNames(1)[0];
+    final int numFateIds = 500;
+    final boolean isUserStore = storeType.equals(FateInstanceType.USER);
+    final Set<FateId> allIds = new HashSet<>();
+    final FateStore<SleepingTestEnv> store1, store2;
+    final SleepingTestEnv testEnv1 = new SleepingTestEnv(50);
+    final SleepingTestEnv testEnv2 = new SleepingTestEnv(50);
+    final ZooUtil.LockID lock1 = new ZooUtil.LockID("/locks", "L1", 50);
+    final ZooUtil.LockID lock2 = new ZooUtil.LockID("/locks", "L2", 52);
+
+    if (isUserStore) {
+      createFateTable(client, tableName);
+      store1 = new UserFateStore<>(client, tableName, lock1);
+      store2 = new UserFateStore<>(client, tableName, lock2);
+    } else {
+      store1 = new MetaFateStore<>(FATE_DIR, zk, client.getZooCache(), lock1);
+      store2 = new MetaFateStore<>(FATE_DIR, zk, client.getZooCache(), lock2);
+    }
+
+    Fate<SleepingTestEnv> fate1 =
+        new Fate<>(testEnv1, store1, Object::toString, 
DefaultConfiguration.getInstance());
+    fate1.startDeadReservationCleaner();
+    Fate<SleepingTestEnv> fate2 =
+        new Fate<>(testEnv2, store2, Object::toString, 
DefaultConfiguration.getInstance());
+    fate2.startDeadReservationCleaner();
+
+    for (int i = 0; i < numFateIds; i++) {
+      FateId fateId;
+      // Start half the txns using fate1, and the other half using fate2
+      if (i % 2 == 0) {
+        fateId = fate1.startTransaction();
+        fate1.seedTransaction("op" + i, fateId, new SleepingTestRepo(), true, 
"test");
+      } else {
+        fateId = fate2.startTransaction();
+        fate2.seedTransaction("op" + i, fateId, new SleepingTestRepo(), true, 
"test");
+      }
+      allIds.add(fateId);
+    }
+    assertEquals(numFateIds, allIds.size());
+
+    // Should be able to wait for completion on any fate instance
+    for (FateId fateId : allIds) {
+      fate2.waitForCompletion(fateId);
+    }
+    // Ensure that all txns have been executed and have only been executed once
+    assertTrue(Collections.disjoint(testEnv1.executedOps, 
testEnv2.executedOps));
+    assertEquals(allIds, Sets.union(testEnv1.executedOps, 
testEnv2.executedOps));
+
+    fate1.shutdown(1, TimeUnit.MINUTES);
+    fate2.shutdown(1, TimeUnit.MINUTES);
+  }
+
+  @Test
+  public void testDeadReservationsCleanup() throws Exception {
+    testDeadReservationsCleanup(FateInstanceType.META);
+    testDeadReservationsCleanup(FateInstanceType.USER);
+  }
+
+  private void testDeadReservationsCleanup(FateInstanceType storeType) throws 
Exception {
+    // Tests reserving some transactions, then simulating that the Manager 
died by creating
+    // a new Fate instance and store with a new LockID. The transactions which 
were
+    // reserved using the old LockID should be cleaned up by Fate's 
DeadReservationCleaner,
+    // then picked up by the new Fate/store.
+
+    final String tableName = getUniqueNames(1)[0];
+    // One transaction for each FATE worker thread
+    final int numFateIds =
+        
Integer.parseInt(Property.MANAGER_FATE_THREADPOOL_SIZE.getDefaultValue());
+    final boolean isUserStore = storeType.equals(FateInstanceType.USER);
+    final Set<FateId> allIds = new HashSet<>();
+    final FateStore<LatchTestEnv> mockedStore1, store2;
+    final LatchTestEnv testEnv1 = new LatchTestEnv();
+    final LatchTestEnv testEnv2 = new LatchTestEnv();
+    final ZooUtil.LockID lock1 = new ZooUtil.LockID("/locks", "L1", 50);
+    final ZooUtil.LockID lock2 = new ZooUtil.LockID("/locks", "L2", 52);
+    Map<FateId,FateStore.FateReservation> reservations;
+
+    if (isUserStore) {
+      createFateTable(client, tableName);
+      mockedStore1 = EasyMock.createMockBuilder(UserFateStore.class)
+          .withConstructor(ClientContext.class, String.class, 
ZooUtil.LockID.class)
+          .withArgs(client, tableName, 
lock1).addMockedMethod("isDeadReservation").createMock();
+    } else {
+      mockedStore1 = EasyMock.createMockBuilder(MetaFateStore.class)
+          .withConstructor(String.class, ZooReaderWriter.class, ZooCache.class,
+              ZooUtil.LockID.class)
+          .withArgs(FATE_DIR, zk, client.getZooCache(), 
lock1).addMockedMethod("isDeadReservation")
+          .createMock();

Review Comment:
   Not sure about this, but the following pattern may simplify this test over 
mocking where liveLocks is a `Set<ZooUtil.LockID>` that the test can add and 
remove things to as needed.
   
   ```suggestion
         mockedStore1 = new MetaFateStore<>(FATE_DIR, zk, client.getZooCache(), 
lock1) {
           @Override
           public boolean isDeadReservation(FateReservation reservation) {
             return !liveLocks.contains(reservation.getLockID()); 
           }
         };
   ```
   
   Another thing to consider is passing in a `Predicate<ZooUtil.LockID>` to the 
Fate store constructors that can be used to check if a lock is live. Then the 
`isDeadReservation()` method could be dropped an stores could use this 
predicate internally.  Test could pass in a predicate that checks a set.  The 
real code could pass in a predicate taht calls the ServiceLock code.
   



##########
core/src/main/java/org/apache/accumulo/core/fate/user/UserFateStore.java:
##########
@@ -151,6 +154,76 @@ protected void create(FateId fateId, FateKey fateKey) {
         + " and fateKey " + fateKey + " after " + maxAttempts + " attempts");
   }
 
+  @Override
+  public Optional<FateTxStore<T>> tryReserve(FateId fateId) {
+    // Create a unique FateReservation for this reservation attempt
+    FateReservation reservation = FateReservation.from(lockID, 
UUID.randomUUID());
+
+    FateMutator.Status status = 
newMutator(fateId).putReservedTx(reservation).tryMutate();
+    if (status.equals(FateMutator.Status.ACCEPTED)) {
+      return Optional.of(new FateTxStoreImpl(fateId, reservation));
+    } else if (status.equals(FateMutator.Status.UNKNOWN)) {
+      // If the status is UNKNOWN, this means an error occurred after the 
mutation was
+      // sent to the TabletServer, and it is unknown if the mutation was 
written. We
+      // need to check if the mutation was written and if it was written by 
this
+      // attempt at reservation. If it was written by this reservation attempt,
+      // we can return the FateTxStore since it was successfully reserved in 
this
+      // attempt, otherwise we return empty (was written by another reservation
+      // attempt or was not written at all).
+      status = newMutator(fateId).requireReserved(reservation).tryMutate();
+      if (status.equals(FateMutator.Status.ACCEPTED)) {
+        return Optional.of(new FateTxStoreImpl(fateId, reservation));
+      }

Review Comment:
   Its possible that this status could be unknown, but I don't think we need to 
loop and try writing again in this case.  I think we can avoid attempting 
another write here altogether (because this is a tryReserve method) and just 
scan and read the reservation column.  If its set then we can return the 
operation w/ the reservation and if its not set then we can return emptty().   
So that avoids looping or retrying.
   
   ```suggestion
        var persistedReservation = // todo scan and read reservation
        if(reservation.equals(persistedReservation)){
           return Optional.of(new FateTxStoreImpl(fateId, reservation));
        } // else fall through and return empty()
   ```



##########
core/src/main/java/org/apache/accumulo/core/fate/AbstractFateStore.java:
##########
@@ -502,4 +520,14 @@ protected Serializable deserializeTxInfo(TxInfo txInfo, 
byte[] data) {
       throw new IllegalStateException("Bad node data " + txInfo);
     }
   }
+
+  /**
+   * TODO 4131 this is a temporary method used to create a dummy lock when 
using a FateStore outside

Review Comment:
   
   This would be a follow on issue.   We can have a path in ZK for utilities to 
get a ZK lock. Currently I think some of the fate admin utilities that change 
the fate store require that no manager process is running.  These utilities 
could be changed to get a lock at this utility path in ZK and reserve fate txs 
they operate on instead of requiring no manager process.



##########
core/src/main/java/org/apache/accumulo/core/fate/user/UserFateStore.java:
##########
@@ -151,6 +154,76 @@ protected void create(FateId fateId, FateKey fateKey) {
         + " and fateKey " + fateKey + " after " + maxAttempts + " attempts");
   }
 
+  @Override
+  public Optional<FateTxStore<T>> tryReserve(FateId fateId) {
+    // Create a unique FateReservation for this reservation attempt
+    FateReservation reservation = FateReservation.from(lockID, 
UUID.randomUUID());
+
+    FateMutator.Status status = 
newMutator(fateId).putReservedTx(reservation).tryMutate();
+    if (status.equals(FateMutator.Status.ACCEPTED)) {
+      return Optional.of(new FateTxStoreImpl(fateId, reservation));
+    } else if (status.equals(FateMutator.Status.UNKNOWN)) {
+      // If the status is UNKNOWN, this means an error occurred after the 
mutation was
+      // sent to the TabletServer, and it is unknown if the mutation was 
written. We
+      // need to check if the mutation was written and if it was written by 
this
+      // attempt at reservation. If it was written by this reservation attempt,
+      // we can return the FateTxStore since it was successfully reserved in 
this
+      // attempt, otherwise we return empty (was written by another reservation
+      // attempt or was not written at all).
+      status = newMutator(fateId).requireReserved(reservation).tryMutate();
+      if (status.equals(FateMutator.Status.ACCEPTED)) {
+        return Optional.of(new FateTxStoreImpl(fateId, reservation));
+      }
+    }
+    return Optional.empty();
+  }
+
+  @Override
+  public boolean isReserved(FateId fateId) {
+    return 
newMutator(fateId).requireReserved().tryMutate().equals(FateMutator.Status.ACCEPTED);

Review Comment:
   This should not write a mutation to check if something is reseved, it should 
scan and read the reserved column.  The server side lock of a conditional 
mutation does not offer benefit here because after this method returns and 
something uses the returned value, the lock is no longer held.  So may as well 
scan in that case.



##########
core/src/main/java/org/apache/accumulo/core/fate/user/UserFateStore.java:
##########
@@ -151,6 +154,76 @@ protected void create(FateId fateId, FateKey fateKey) {
         + " and fateKey " + fateKey + " after " + maxAttempts + " attempts");
   }
 
+  @Override
+  public Optional<FateTxStore<T>> tryReserve(FateId fateId) {
+    // Create a unique FateReservation for this reservation attempt
+    FateReservation reservation = FateReservation.from(lockID, 
UUID.randomUUID());
+
+    FateMutator.Status status = 
newMutator(fateId).putReservedTx(reservation).tryMutate();
+    if (status.equals(FateMutator.Status.ACCEPTED)) {
+      return Optional.of(new FateTxStoreImpl(fateId, reservation));
+    } else if (status.equals(FateMutator.Status.UNKNOWN)) {
+      // If the status is UNKNOWN, this means an error occurred after the 
mutation was
+      // sent to the TabletServer, and it is unknown if the mutation was 
written. We
+      // need to check if the mutation was written and if it was written by 
this
+      // attempt at reservation. If it was written by this reservation attempt,
+      // we can return the FateTxStore since it was successfully reserved in 
this
+      // attempt, otherwise we return empty (was written by another reservation
+      // attempt or was not written at all).
+      status = newMutator(fateId).requireReserved(reservation).tryMutate();
+      if (status.equals(FateMutator.Status.ACCEPTED)) {
+        return Optional.of(new FateTxStoreImpl(fateId, reservation));
+      }
+    }
+    return Optional.empty();
+  }
+
+  @Override
+  public boolean isReserved(FateId fateId) {
+    return 
newMutator(fateId).requireReserved().tryMutate().equals(FateMutator.Status.ACCEPTED);
+  }
+
+  @Override
+  public Map<FateId,FateReservation> getActiveReservations() {
+    Map<FateId,FateReservation> activeReservations = new HashMap<>();
+
+    try (Scanner scanner = context.createScanner(tableName, 
Authorizations.EMPTY)) {
+      scanner.setRange(new Range());
+      scanner.fetchColumn(TxColumnFamily.RESERVATION_COLUMN.getColumnFamily(),
+          TxColumnFamily.RESERVATION_COLUMN.getColumnQualifier());
+      scanner.stream()
+          .filter(entry -> 
FateReservation.isFateReservation(entry.getValue().toString()))
+          .forEach(entry -> {
+            String reservationColVal = entry.getValue().toString();
+            FateId fateId = FateId.from(fateInstanceType, 
entry.getKey().getRow().toString());
+            FateReservation reservation = 
FateReservation.from(reservationColVal);
+            activeReservations.put(fateId, reservation);
+          });
+    } catch (TableNotFoundException e) {
+      throw new IllegalStateException(tableName + " not found!", e);
+    }
+
+    return activeReservations;
+  }
+
+  @Override
+  public void deleteDeadReservations() {
+    for (Entry<FateId,FateReservation> entry : 
getActiveReservations().entrySet()) {
+      FateId fateId = entry.getKey();
+      FateReservation reservation = entry.getValue();
+      if (isDeadReservation(reservation)) {
+        newMutator(fateId).putUnreserveTx(reservation).tryMutate();
+        // No need to check the status... If it is ACCEPTED, we have 
successfully unreserved

Review Comment:
   Nice description



##########
core/src/main/java/org/apache/accumulo/core/fate/Fate.java:
##########
@@ -313,6 +317,26 @@ private void undo(FateId fateId, Repo<T> op) {
 
   }
 
+  /**
+   * A thread that finds reservations held by dead processes and unreserves 
them. Only one thread
+   * runs per store type across all Fate instances (one to clean up dead 
reservations for
+   * {@link org.apache.accumulo.core.fate.user.UserFateStore UserFateStore} 
and one to clean up dead
+   * reservations for {@link MetaFateStore}).
+   */
+  private class DeadReservationCleaner implements Runnable {
+    // TODO 4131 periodic check runs every 30 seconds
+    // Should this be longer? Shorter? A configurable Property? A function of 
something?
+    private static final long INTERVAL_MILLIS = 30_000;
+
+    @Override
+    public void run() {
+      while (keepRunning.get()) {
+        store.deleteDeadReservations();

Review Comment:
   I have not looked into it, but need to understand what happens if this 
thread dies/throws an exception.  Accumulo servers processes can designate some 
threads as critical and that would essentially cause the manager process to die 
when that is done.  May want to do that for this thread, if its not already 
done.



##########
core/src/main/java/org/apache/accumulo/core/fate/user/UserFateStore.java:
##########
@@ -272,8 +340,12 @@ public FateInstanceType type() {
 
   private class FateTxStoreImpl extends AbstractFateTxStoreImpl<T> {
 
-    private FateTxStoreImpl(FateId fateId, boolean isReserved) {
-      super(fateId, isReserved);
+    private FateTxStoreImpl(FateId fateId) {
+      super(fateId);
+    }
+
+    private FateTxStoreImpl(FateId fateId, FateReservation reservation) {

Review Comment:
   This would be a follow on issue, not something for this PR.  For all of the 
writes made for a Fate tx we can now add a condition requiring the reservation 
to be set.



##########
test/src/main/java/org/apache/accumulo/test/fate/MultipleStoresIT.java:
##########
@@ -0,0 +1,571 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   https://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.accumulo.test.fate;
+
+import static 
org.apache.accumulo.test.fate.user.UserFateStoreIT.createFateTable;
+import static org.easymock.EasyMock.anyObject;
+import static org.easymock.EasyMock.expect;
+import static org.easymock.EasyMock.replay;
+import static org.junit.jupiter.api.Assertions.assertEquals;
+import static org.junit.jupiter.api.Assertions.assertFalse;
+import static org.junit.jupiter.api.Assertions.assertThrows;
+import static org.junit.jupiter.api.Assertions.assertTrue;
+
+import java.io.File;
+import java.time.Duration;
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import java.util.UUID;
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicInteger;
+
+import org.apache.accumulo.core.client.Accumulo;
+import org.apache.accumulo.core.clientImpl.ClientContext;
+import org.apache.accumulo.core.conf.DefaultConfiguration;
+import org.apache.accumulo.core.conf.Property;
+import org.apache.accumulo.core.fate.Fate;
+import org.apache.accumulo.core.fate.FateId;
+import org.apache.accumulo.core.fate.FateInstanceType;
+import org.apache.accumulo.core.fate.FateStore;
+import org.apache.accumulo.core.fate.MetaFateStore;
+import org.apache.accumulo.core.fate.ReadOnlyFateStore;
+import org.apache.accumulo.core.fate.Repo;
+import org.apache.accumulo.core.fate.user.UserFateStore;
+import org.apache.accumulo.core.fate.zookeeper.ZooCache;
+import org.apache.accumulo.core.fate.zookeeper.ZooReaderWriter;
+import org.apache.accumulo.core.fate.zookeeper.ZooUtil;
+import org.apache.accumulo.harness.SharedMiniClusterBase;
+import org.apache.accumulo.test.util.Wait;
+import org.apache.accumulo.test.zookeeper.ZooKeeperTestingServer;
+import org.easymock.EasyMock;
+import org.junit.jupiter.api.AfterAll;
+import org.junit.jupiter.api.AfterEach;
+import org.junit.jupiter.api.BeforeAll;
+import org.junit.jupiter.api.BeforeEach;
+import org.junit.jupiter.api.Test;
+import org.junit.jupiter.api.io.TempDir;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import com.google.common.collect.Sets;
+
+// TODO 4131 could potentially have separate classes for testing MetaFateStore 
and UserFateStore
+// similar to how FateTestRunner is used, however that interface doesn't work 
as nicely here
+// since we are using multiple stores instead of just one. Can do something 
similar to
+// FateTestRunner here if desired
+public class MultipleStoresIT extends SharedMiniClusterBase {
+
+  private static final Logger LOG = 
LoggerFactory.getLogger(MultipleStoresIT.class);
+  @TempDir
+  private static File tempDir;
+  private static ZooKeeperTestingServer szk = null;
+  private static ZooReaderWriter zk;
+  private static final String FATE_DIR = "/fate";
+  private ClientContext client;
+
+  @BeforeEach
+  public void beforeEachSetup() {
+    client = (ClientContext) 
Accumulo.newClient().from(getClientProps()).build();
+  }
+
+  @AfterEach
+  public void afterEachTeardown() {
+    client.close();
+  }
+
+  @BeforeAll
+  public static void beforeAllSetup() throws Exception {
+    SharedMiniClusterBase.startMiniCluster();
+    szk = new ZooKeeperTestingServer(tempDir);
+    zk = szk.getZooReaderWriter();
+  }
+
+  @AfterAll
+  public static void afterAllTeardown() throws Exception {
+    SharedMiniClusterBase.stopMiniCluster();
+    szk.close();
+  }
+
+  @Test
+  public void testReserveUnreserve() throws Exception {
+    testReserveUnreserve(FateInstanceType.META);
+    testReserveUnreserve(FateInstanceType.USER);
+  }
+
+  private void testReserveUnreserve(FateInstanceType storeType) throws 
Exception {
+    // reserving/unreserving a FateId should be reflected across instances of 
the stores
+    final String tableName = getUniqueNames(1)[0];
+    final int numFateIds = 500;
+    final List<FateStore.FateTxStore<SleepingTestEnv>> reservations = new 
ArrayList<>();
+    final boolean isUserStore = storeType.equals(FateInstanceType.USER);
+    final Set<FateId> allIds = new HashSet<>();
+    final FateStore<SleepingTestEnv> store1, store2;
+    final ZooUtil.LockID lock1 = new ZooUtil.LockID("/locks", "L1", 50);
+    final ZooUtil.LockID lock2 = new ZooUtil.LockID("/locks", "L2", 52);
+    Map<FateId,FateStore.FateReservation> activeReservations;
+
+    if (isUserStore) {
+      createFateTable(client, tableName);
+      store1 = new UserFateStore<>(client, tableName, lock1);
+      store2 = new UserFateStore<>(client, tableName, lock2);
+    } else {
+      store1 = new MetaFateStore<>(FATE_DIR, zk, client.getZooCache(), lock1);
+      store2 = new MetaFateStore<>(FATE_DIR, zk, client.getZooCache(), lock2);
+    }
+
+    // Create the fate ids using store1
+    for (int i = 0; i < numFateIds; i++) {
+      assertTrue(allIds.add(store1.create()));
+    }
+    assertEquals(numFateIds, allIds.size());
+
+    // Reserve half the fate ids using store1 and rest using store2, after 
reserving a fate id in
+    // one, should not be able to reserve the same in the other. Should also 
not matter that all the
+    // ids were created using store1
+    int count = 0;
+    for (FateId fateId : allIds) {
+      if (count % 2 == 0) {
+        reservations.add(store1.reserve(fateId));
+        assertTrue(store2.tryReserve(fateId).isEmpty());
+      } else {
+        reservations.add(store2.reserve(fateId));
+        assertTrue(store1.tryReserve(fateId).isEmpty());
+      }
+      count++;
+    }
+    // Both stores should return the same reserved transactions
+    activeReservations = store1.getActiveReservations();
+    assertEquals(allIds, activeReservations.keySet());
+    activeReservations = store2.getActiveReservations();
+    assertEquals(allIds, activeReservations.keySet());
+
+    // Test setting/getting the TStatus and unreserving the transactions
+    for (int i = 0; i < allIds.size(); i++) {
+      var reservation = reservations.get(i);
+      assertEquals(ReadOnlyFateStore.TStatus.NEW, reservation.getStatus());
+      reservation.setStatus(ReadOnlyFateStore.TStatus.SUBMITTED);
+      assertEquals(ReadOnlyFateStore.TStatus.SUBMITTED, 
reservation.getStatus());
+      reservation.delete();
+      reservation.unreserve(Duration.ofMillis(0));
+      // Attempt to set a status on a tx that has been unreserved (should 
throw exception)
+      assertThrows(IllegalStateException.class,
+          () -> reservation.setStatus(ReadOnlyFateStore.TStatus.NEW));
+    }
+    assertTrue(store1.getActiveReservations().isEmpty());
+    assertTrue(store2.getActiveReservations().isEmpty());
+  }
+
+  @Test
+  public void testReserveNonExistentTxn() throws Exception {
+    testReserveNonExistentTxn(FateInstanceType.META);
+    testReserveNonExistentTxn(FateInstanceType.USER);
+  }
+
+  private void testReserveNonExistentTxn(FateInstanceType storeType) throws 
Exception {
+    // Tests that reserve() doesn't hang indefinitely and instead throws an 
error
+    // on reserve() a non-existent transaction.
+    final FateStore<SleepingTestEnv> store;
+    final boolean isUserStore = storeType.equals(FateInstanceType.USER);
+    final String tableName = getUniqueNames(1)[0];
+    final FateId fakeFateId = FateId.from(storeType, UUID.randomUUID());
+    final ZooUtil.LockID lock = new ZooUtil.LockID("/locks", "L1", 50);
+
+    if (isUserStore) {
+      createFateTable(client, tableName);
+      store = new UserFateStore<>(client, tableName, lock);
+    } else {
+      store = new MetaFateStore<>(FATE_DIR, zk, client.getZooCache(), lock);
+    }
+
+    assertThrows(IllegalStateException.class, () -> store.reserve(fakeFateId));
+  }
+
+  @Test
+  public void testReserveReservedAndUnreserveUnreserved() throws Exception {
+    testReserveReservedAndUnreserveUnreserved(FateInstanceType.META);
+    testReserveReservedAndUnreserveUnreserved(FateInstanceType.USER);
+  }
+
+  private void testReserveReservedAndUnreserveUnreserved(FateInstanceType 
storeType)
+      throws Exception {
+    final String tableName = getUniqueNames(1)[0];
+    final int numFateIds = 500;
+    final boolean isUserStore = storeType.equals(FateInstanceType.USER);
+    final Set<FateId> allIds = new HashSet<>();
+    final List<FateStore.FateTxStore<SleepingTestEnv>> reservations = new 
ArrayList<>();
+    final FateStore<SleepingTestEnv> store;
+    final ZooUtil.LockID lock = new ZooUtil.LockID("/locks", "L1", 50);
+
+    if (isUserStore) {
+      createFateTable(client, tableName);
+      store = new UserFateStore<>(client, tableName, lock);
+    } else {
+      store = new MetaFateStore<>(FATE_DIR, zk, client.getZooCache(), lock);
+    }
+
+    // Create some FateIds and ensure that they can be reserved
+    for (int i = 0; i < numFateIds; i++) {
+      FateId fateId = store.create();
+      assertTrue(allIds.add(fateId));
+      var reservation = store.tryReserve(fateId);
+      assertFalse(reservation.isEmpty());
+      reservations.add(reservation.orElseThrow());
+    }
+    assertEquals(numFateIds, allIds.size());
+
+    // Try to reserve again, should not reserve
+    for (FateId fateId : allIds) {
+      assertTrue(store.tryReserve(fateId).isEmpty());
+    }
+
+    // Unreserve all the FateIds
+    for (var reservation : reservations) {
+      reservation.unreserve(Duration.ofMillis(0));
+    }
+    // Try to unreserve again (should throw exception)
+    for (var reservation : reservations) {
+      assertThrows(IllegalStateException.class, () -> 
reservation.unreserve(Duration.ofMillis(0)));
+    }
+  }
+
+  @Test
+  public void testReserveAfterUnreserveAndReserveAfterDeleted() throws 
Exception {
+    testReserveAfterUnreserveAndReserveAfterDeleted(FateInstanceType.META);
+    testReserveAfterUnreserveAndReserveAfterDeleted(FateInstanceType.USER);
+  }
+
+  private void 
testReserveAfterUnreserveAndReserveAfterDeleted(FateInstanceType storeType)
+      throws Exception {
+    final String tableName = getUniqueNames(1)[0];
+    final int numFateIds = 500;
+    final boolean isUserStore = storeType.equals(FateInstanceType.USER);
+    final Set<FateId> allIds = new HashSet<>();
+    final List<FateStore.FateTxStore<SleepingTestEnv>> reservations = new 
ArrayList<>();
+    final FateStore<SleepingTestEnv> store;
+    final ZooUtil.LockID lock = new ZooUtil.LockID("/locks", "L1", 50);
+
+    if (isUserStore) {
+      createFateTable(client, tableName);
+      store = new UserFateStore<>(client, tableName, lock);
+    } else {
+      store = new MetaFateStore<>(FATE_DIR, zk, client.getZooCache(), lock);
+    }
+
+    // Create some FateIds and ensure that they can be reserved
+    for (int i = 0; i < numFateIds; i++) {
+      FateId fateId = store.create();
+      assertTrue(allIds.add(fateId));
+      var reservation = store.tryReserve(fateId);
+      assertFalse(reservation.isEmpty());
+      reservations.add(reservation.orElseThrow());
+    }
+    assertEquals(numFateIds, allIds.size());
+
+    // Unreserve all
+    for (var reservation : reservations) {
+      reservation.unreserve(Duration.ofMillis(0));
+    }
+
+    // Ensure they can be reserved again, and delete and unreserve this time
+    for (FateId fateId : allIds) {
+      // Verify that the tx status is still NEW after unreserving since it 
hasn't been deleted
+      assertEquals(ReadOnlyFateStore.TStatus.NEW, 
store.read(fateId).getStatus());
+      var reservation = store.tryReserve(fateId);
+      assertFalse(reservation.isEmpty());
+      reservation.orElseThrow().delete();
+      reservation.orElseThrow().unreserve(Duration.ofMillis(0));
+    }
+
+    for (FateId fateId : allIds) {
+      // Verify that the tx is now unknown since it has been deleted
+      assertEquals(ReadOnlyFateStore.TStatus.UNKNOWN, 
store.read(fateId).getStatus());
+      // Attempt to reserve a deleted txn, should throw an exception and not 
wait indefinitely
+      assertThrows(IllegalStateException.class, () -> store.reserve(fateId));
+    }
+  }
+
+  @Test
+  public void testMultipleFateInstances() throws Exception {
+    testMultipleFateInstances(FateInstanceType.META);
+    testMultipleFateInstances(FateInstanceType.USER);
+  }
+
+  private void testMultipleFateInstances(FateInstanceType storeType) throws 
Exception {
+    final String tableName = getUniqueNames(1)[0];
+    final int numFateIds = 500;
+    final boolean isUserStore = storeType.equals(FateInstanceType.USER);
+    final Set<FateId> allIds = new HashSet<>();
+    final FateStore<SleepingTestEnv> store1, store2;
+    final SleepingTestEnv testEnv1 = new SleepingTestEnv(50);
+    final SleepingTestEnv testEnv2 = new SleepingTestEnv(50);
+    final ZooUtil.LockID lock1 = new ZooUtil.LockID("/locks", "L1", 50);
+    final ZooUtil.LockID lock2 = new ZooUtil.LockID("/locks", "L2", 52);
+
+    if (isUserStore) {
+      createFateTable(client, tableName);
+      store1 = new UserFateStore<>(client, tableName, lock1);
+      store2 = new UserFateStore<>(client, tableName, lock2);
+    } else {
+      store1 = new MetaFateStore<>(FATE_DIR, zk, client.getZooCache(), lock1);
+      store2 = new MetaFateStore<>(FATE_DIR, zk, client.getZooCache(), lock2);
+    }
+
+    Fate<SleepingTestEnv> fate1 =
+        new Fate<>(testEnv1, store1, Object::toString, 
DefaultConfiguration.getInstance());
+    fate1.startDeadReservationCleaner();
+    Fate<SleepingTestEnv> fate2 =
+        new Fate<>(testEnv2, store2, Object::toString, 
DefaultConfiguration.getInstance());
+    fate2.startDeadReservationCleaner();
+
+    for (int i = 0; i < numFateIds; i++) {
+      FateId fateId;
+      // Start half the txns using fate1, and the other half using fate2
+      if (i % 2 == 0) {
+        fateId = fate1.startTransaction();
+        fate1.seedTransaction("op" + i, fateId, new SleepingTestRepo(), true, 
"test");
+      } else {
+        fateId = fate2.startTransaction();
+        fate2.seedTransaction("op" + i, fateId, new SleepingTestRepo(), true, 
"test");
+      }
+      allIds.add(fateId);
+    }
+    assertEquals(numFateIds, allIds.size());
+
+    // Should be able to wait for completion on any fate instance
+    for (FateId fateId : allIds) {
+      fate2.waitForCompletion(fateId);
+    }
+    // Ensure that all txns have been executed and have only been executed once
+    assertTrue(Collections.disjoint(testEnv1.executedOps, 
testEnv2.executedOps));
+    assertEquals(allIds, Sets.union(testEnv1.executedOps, 
testEnv2.executedOps));
+
+    fate1.shutdown(1, TimeUnit.MINUTES);
+    fate2.shutdown(1, TimeUnit.MINUTES);
+  }
+
+  @Test
+  public void testDeadReservationsCleanup() throws Exception {
+    testDeadReservationsCleanup(FateInstanceType.META);
+    testDeadReservationsCleanup(FateInstanceType.USER);
+  }
+
+  private void testDeadReservationsCleanup(FateInstanceType storeType) throws 
Exception {
+    // Tests reserving some transactions, then simulating that the Manager 
died by creating
+    // a new Fate instance and store with a new LockID. The transactions which 
were
+    // reserved using the old LockID should be cleaned up by Fate's 
DeadReservationCleaner,
+    // then picked up by the new Fate/store.
+
+    final String tableName = getUniqueNames(1)[0];
+    // One transaction for each FATE worker thread
+    final int numFateIds =
+        
Integer.parseInt(Property.MANAGER_FATE_THREADPOOL_SIZE.getDefaultValue());
+    final boolean isUserStore = storeType.equals(FateInstanceType.USER);
+    final Set<FateId> allIds = new HashSet<>();
+    final FateStore<LatchTestEnv> mockedStore1, store2;
+    final LatchTestEnv testEnv1 = new LatchTestEnv();
+    final LatchTestEnv testEnv2 = new LatchTestEnv();
+    final ZooUtil.LockID lock1 = new ZooUtil.LockID("/locks", "L1", 50);
+    final ZooUtil.LockID lock2 = new ZooUtil.LockID("/locks", "L2", 52);
+    Map<FateId,FateStore.FateReservation> reservations;
+
+    if (isUserStore) {
+      createFateTable(client, tableName);
+      mockedStore1 = EasyMock.createMockBuilder(UserFateStore.class)
+          .withConstructor(ClientContext.class, String.class, 
ZooUtil.LockID.class)
+          .withArgs(client, tableName, 
lock1).addMockedMethod("isDeadReservation").createMock();

Review Comment:
   Wonder if instead of mocking that this method could create real locks in ZK 
since its running in an IT.  This would make the test be a bit more end to end. 
 Looked a bit at ServiceLockIT trying to see what was needed to create a lock 
and does not look like a few lines of code.  Maybe some utility code could be 
shared with that class, but not sure.  If its even workable, could be a follow 
on improvement.  Not too sure about this suggestion, just posting for 
discussion. 
   



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: notifications-unsubscr...@accumulo.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org

Reply via email to