ascherbakoff commented on code in PR #7999:
URL: https://github.com/apache/ignite-3/pull/7999#discussion_r3121867054


##########
modules/table/src/main/java/org/apache/ignite/internal/table/distributed/replicator/DefaultTablePartitionReplicaProcessor.java:
##########
@@ -581,6 +597,80 @@ private void replicaTouch(UUID txId, UUID coordinatorId, 
ZonePartitionId commitP
                 .build());
     }
 
+    private <T> CompletableFuture<T> resolvePendingTransactions(
+            HybridTimestamp requestTime,
+            HybridTimestamp currentSafeTime,
+            Function<HybridTimestamp, CompletableFuture<T>> action
+    ) {
+        // Wait for currentSafeTime >= requestTime to avoid out-of-order 
transactions arrival.
+        if (currentSafeTime.compareTo(requestTime) < 0) {
+            return safeTime.waitFor(requestTime)
+                    .thenComposeAsync(
+                            unused -> resolvePendingTransactions(requestTime, 
safeTime.current(), action),
+                            partitionOperationsExecutor);
+        }
+
+        assert currentSafeTime.compareTo(requestTime) >= 0 : "currentSafeTime 
< lowerBoundTimestamp";
+        assert currentSafeTime.compareTo(safeTime.current()) <= 0 : 
"currentSafeTime > safeTime";
+
+        // Stable committed snapshot is ensured after resolving pending 
transactions state.
+        UUID upperBoundTxId = TransactionIds.transactionId(currentSafeTime, 
Integer.MAX_VALUE, TxPriority.NORMAL);
+        ConcurrentNavigableMap<UUID, PendingTxContext> txToWait = 
pendingTransactions.headMap(upperBoundTxId, true);
+
+        if (!txToWait.isEmpty()) {
+            List<CompletableFuture<?>> futs = null;
+
+            for (Map.Entry<UUID, PendingTxContext> entry : 
txToWait.entrySet()) {
+                if (!entry.getValue().cleanupFut.isDone()) {
+                    futs = futs == null ? new ArrayList<>() : futs;
+                    futs.add(resolveTransactionState(entry.getKey(), 
entry.getValue(), currentSafeTime));
+                }
+            }
+
+            if (futs != null) {
+                return allOf(futs.toArray(CompletableFuture[]::new))
+                        .thenComposeAsync(unused -> 
action.apply(currentSafeTime), partitionOperationsExecutor);
+            }
+        }
+
+        return action.apply(currentSafeTime);
+    }
+
+    private CompletableFuture<Void> resolveTransactionState(UUID txId, 
PendingTxContext txCtx, HybridTimestamp observableTimestamp) {
+        CompletableFuture<Void> resFut = new CompletableFuture<>();
+
+        txCtx.cleanupFut.whenComplete(copyStateTo(resFut));
+
+        return resFut.orTimeout(WAIT_FOR_CLEANUP_TIMEOUT_MILLIS, 
TimeUnit.MILLISECONDS).handle((ignored, e) -> {
+            if (e == null) {
+                return CompletableFutures.<Void>nullCompletedFuture();
+            }
+
+            if (e instanceof TimeoutException) {
+                // Transaction was not cleaned up in time.
+                // Send tx state request to coordinator to bump commit 
timestamp for active txn beyond safe timestamp.
+                return 
transactionStateResolver.resolveTxState(txStateResolutionParameters()
+                                .txId(txId)
+                                .commitGroupId(txCtx.commitPartId)
+                                .readTimestamp(observableTimestamp)
+                                .build())

Review Comment:
   I'm not fully sure, but rowId is not required for resolving tx state. 
   It's declared as nullable in TxStateResolutionParameters:
   `@Nullable RowId rowId,`
   It should not go to the primary replica if no rowId presents.
   Also there are some tests which doesn't declare rowId.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to