keith-turner commented on code in PR #4204:
URL: https://github.com/apache/accumulo/pull/4204#discussion_r1484730251


##########
core/src/main/java/org/apache/accumulo/core/fate/AbstractFateStore.java:
##########
@@ -239,12 +254,101 @@ public int getDeferredCount() {
     }
   }
 
+  private Optional<FateId> create(FateKey fateKey) {
+    FateId fateId = fateIdGenerator.fromTypeAndKey(getInstanceType(), fateKey);
+
+    try {
+      create(fateId, fateKey);
+    } catch (IllegalStateException e) {
+      Pair<TStatus,Optional<FateKey>> statusAndKey = getStatusAndKey(fateId);
+      TStatus status = statusAndKey.getFirst();
+      Optional<FateKey> tFateKey = statusAndKey.getSecond();
+
+      // Case 1: Status is NEW so this is unseeded, we can return and allow 
the calling code
+      // to reserve/seed as long as the existing key is the same and not 
different as that would
+      // mean a collision
+      if (status == TStatus.NEW) {
+        Preconditions.checkState(tFateKey.isPresent(), "Tx Key is missing from 
tid %s",
+            fateId.getTid());
+        Preconditions.checkState(fateKey.equals(tFateKey.orElseThrow()),
+            "Collision detected for tid %s", fateId.getTid());
+        // Case 2: Status is some other state which means already in progress
+        // so we can just log and return empty optional
+      } else {
+        log.trace("Existing transaction {} already exists for key {} with 
status {}", fateId,
+            fateKey, status);
+        return Optional.empty();
+      }
+    }
+
+    return Optional.of(fateId);
+  }
+
+  @Override
+  public Optional<FateTxStore<T>> createAndReserve(FateKey fateKey) {
+    FateId fateId = fateIdGenerator.fromTypeAndKey(getInstanceType(), fateKey);
+    final Optional<FateTxStore<T>> txStore;
+
+    // First make sure we can reserve in memory the fateId, if not
+    // we can return an empty Optional as it is reserved and in progress
+    // This reverses the usual order of creation and then reservation but
+    // this prevents a race condition by ensuring we can reserve first.
+    // This will create the FateTxStore before creation but this object
+    // is not exposed until after creation is finished so there should not
+    // be any errors.
+    final Optional<FateTxStore<T>> reservedTxStore;
+    synchronized (this) {
+      reservedTxStore = tryReserve(fateId);
+    }
+
+    // If present we were able to reserve so try and create
+    if (reservedTxStore.isPresent()) {
+      try {
+        if (create(fateKey).isPresent()) {
+          txStore = reservedTxStore;
+        } else {
+          // We already exist in a non-new state then un-reserve and an empty
+          // Optional will be returned. This is expected to happen when the
+          // system is busy and operations are not running, and we keep 
seeding them
+          synchronized (this) {
+            reserved.remove(fateId);
+          }
+          txStore = Optional.empty();
+        }
+      } catch (Exception e) {
+        // Clean up the reservation if the creation failed
+        // And then throw error
+        synchronized (this) {
+          reserved.remove(fateId);
+        }
+        if (e instanceof IllegalStateException) {
+          throw e;
+        } else {
+          throw new IllegalStateException(e);
+        }
+      }
+    } else {
+      // Could not reserve so return empty

Review Comment:
   Created the following after seeing another log stmt in this PR.  Thought it 
would be good to log something here like it does elsewhere.  These trace stmts 
will be nice for tracking down bugs
   
   ```suggestion
         // Could not reserve so return empty
         log.trace("Another thread currently has transaction {} key {} 
reserved", fateId,
               fateKey);
   ```
   



##########
core/src/main/java/org/apache/accumulo/core/fate/AbstractFateStore.java:
##########
@@ -295,16 +296,28 @@ public Optional<FateTxStore<T>> createAndReserve(FateKey 
fateKey) {
     // This will create the FateTxStore before creation but this object
     // is not exposed until after creation is finished so there should not
     // be any errors.
+    final Optional<FateTxStore<T>> reservedTxStore;
     synchronized (this) {
-      txStore = tryReserve(fateId);
+      reservedTxStore = tryReserve(fateId);
     }
 
-    if (txStore.isPresent()) {
+    // If present we were able to reserve so try and create
+    if (reservedTxStore.isPresent()) {
       try {
-        Preconditions.checkState(create(fateKey) != null,
-            "Unexpected null FateId when creating and reserving fateKey %s", 
fateKey);
+        if (create(fateKey).isPresent()) {
+          txStore = reservedTxStore;

Review Comment:
   Could modify the code to keep the return value of create and validate it.
   
   ```suggestion
              Precondition.checkState(fateId.equals(fateIdFromCreate));
             txStore = reservedTxStore;
   ```



##########
core/src/test/java/org/apache/accumulo/core/fate/TestStore.java:
##########
@@ -32,25 +32,54 @@
 import java.util.function.Consumer;
 import java.util.stream.Stream;
 
+import org.apache.accumulo.core.util.Pair;
+
+import com.google.common.hash.HashCode;
+import com.google.common.hash.Hashing;
+
 /**
  * Transient in memory store for transactions.
  */
 public class TestStore implements FateStore<String> {
 
   private long nextId = 1;
-  private Map<FateId,TStatus> statuses = new HashMap<>();
-  private Set<FateId> reserved = new HashSet<>();
-
+  private final Map<FateId,Pair<TStatus,Optional<FateKey>>> statuses = new 
HashMap<>();
+  private final Map<FateId,Map<Fate.TxInfo,Serializable>> txInfos = new 
HashMap<>();
+  private final Set<FateId> reserved = new HashSet<>();
   private static final FateInstanceType fateInstanceType = 
FateInstanceType.USER;
-  private Map<FateId,Map<Fate.TxInfo,Serializable>> txInfos = new HashMap<>();
 
   @Override
   public FateId create() {
     FateId fateId = FateId.from(fateInstanceType, nextId++);
-    statuses.put(fateId, TStatus.NEW);
+    statuses.put(fateId, new Pair<>(TStatus.NEW, Optional.empty()));
     return fateId;
   }
 
+  @Override
+  public Optional<FateTxStore<String>> createAndReserve(FateKey key) {
+    HashCode hashCode = Hashing.murmur3_128().hashBytes(key.getSerialized());

Review Comment:
   This TestStore is only used by unit test. If the existing unit test does not 
call thsi method then could throw an UnsupportedOpException.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: notifications-unsubscr...@accumulo.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org

Reply via email to