denis-chudov commented on code in PR #7999:
URL: https://github.com/apache/ignite-3/pull/7999#discussion_r3110983133
##########
modules/client/src/main/java/org/apache/ignite/internal/client/tx/ClientTransaction.java:
##########
@@ -89,10 +89,14 @@ public class ClientTransaction implements Transaction {
/** Transaction id. */
private final long id;
- /** The future used on repeated commit/rollback. */
+ /** The future is used on repeated direct (via API) commit/rollback. */
@IgniteToStringExclude
private volatile CompletableFuture<Void> finishFut;
+ /** The future is used when a transaction is finished implicitly on
enlistment failure or kill. */
+ @IgniteToStringExclude
Review Comment:
I see that in every case when you expose finishFut, you now check the
existence of implicitRollbackFut. Why is it needed? Would it be more simple to
have single future and flag like isImplicitRollback?
##########
modules/table/src/main/java/org/apache/ignite/internal/table/distributed/replicator/DefaultTablePartitionReplicaProcessor.java:
##########
@@ -581,6 +597,80 @@ private void replicaTouch(UUID txId, UUID coordinatorId,
ZonePartitionId commitP
.build());
}
+ private <T> CompletableFuture<T> resolvePendingTransactions(
+ HybridTimestamp requestTime,
+ HybridTimestamp currentSafeTime,
+ Function<HybridTimestamp, CompletableFuture<T>> action
+ ) {
+ // Wait for currentSafeTime >= requestTime to avoid out-of-order
transactions arrival.
+ if (currentSafeTime.compareTo(requestTime) < 0) {
+ return safeTime.waitFor(requestTime)
+ .thenComposeAsync(
+ unused -> resolvePendingTransactions(requestTime,
safeTime.current(), action),
+ partitionOperationsExecutor);
+ }
+
+ assert currentSafeTime.compareTo(requestTime) >= 0 : "currentSafeTime
< lowerBoundTimestamp";
+ assert currentSafeTime.compareTo(safeTime.current()) <= 0 :
"currentSafeTime > safeTime";
+
+ // Stable committed snapshot is ensured after resolving pending
transactions state.
+ UUID upperBoundTxId = TransactionIds.transactionId(currentSafeTime,
Integer.MAX_VALUE, TxPriority.NORMAL);
+ ConcurrentNavigableMap<UUID, PendingTxContext> txToWait =
pendingTransactions.headMap(upperBoundTxId, true);
+
+ if (!txToWait.isEmpty()) {
+ List<CompletableFuture<?>> futs = null;
+
+ for (Map.Entry<UUID, PendingTxContext> entry :
txToWait.entrySet()) {
+ if (!entry.getValue().cleanupFut.isDone()) {
+ futs = futs == null ? new ArrayList<>() : futs;
+ futs.add(resolveTransactionState(entry.getKey(),
entry.getValue(), currentSafeTime));
+ }
+ }
+
+ if (futs != null) {
+ return allOf(futs.toArray(CompletableFuture[]::new))
+ .thenComposeAsync(unused ->
action.apply(currentSafeTime), partitionOperationsExecutor);
+ }
+ }
+
+ return action.apply(currentSafeTime);
+ }
+
+ private CompletableFuture<Void> resolveTransactionState(UUID txId,
PendingTxContext txCtx, HybridTimestamp observableTimestamp) {
+ CompletableFuture<Void> resFut = new CompletableFuture<>();
+
+ txCtx.cleanupFut.whenComplete(copyStateTo(resFut));
+
+ return resFut.orTimeout(WAIT_FOR_CLEANUP_TIMEOUT_MILLIS,
TimeUnit.MILLISECONDS).handle((ignored, e) -> {
+ if (e == null) {
+ return CompletableFutures.<Void>nullCompletedFuture();
+ }
+
+ if (e instanceof TimeoutException) {
+ // Transaction was not cleaned up in time.
+ // Send tx state request to coordinator to bump commit
timestamp for active txn beyond safe timestamp.
+ return
transactionStateResolver.resolveTxState(txStateResolutionParameters()
+ .txId(txId)
+ .commitGroupId(txCtx.commitPartId)
+ .readTimestamp(observableTimestamp)
+ .build())
Review Comment:
This is the first case when we used resolveTxState while not having write
intent. You don't have row info here unlike `resolveWriteIntentReadability` but
transactionStateResolver will automatically redirect the request to commit
partition, if there is no coordinator, and commit partition, if there is no tx
state meta, will redirect it to primary replica. Are you sure it won't break in
this case?
##########
modules/partition-replicator/src/main/java/org/apache/ignite/internal/partition/replicator/handlers/TxFinishReplicaRequestHandler.java:
##########
@@ -224,12 +224,22 @@ private CompletableFuture<TransactionResult>
finishAndCleanup(
.map(entry -> new EnlistedPartitionGroup(entry.getKey(),
entry.getValue().tableIds()))
.collect(toList());
return finishTransaction(enlistedPartitionGroups, txId, commit,
commitTimestamp)
- .thenCompose(txResult -> {
+ .thenApply(txResult -> {
boolean actualCommit = txResult.transactionState() ==
COMMITTED;
HybridTimestamp actualCommitTs =
txResult.commitTimestamp();
- return txManager.cleanup(replicationGroupId,
enlistedPartitions, actualCommit, actualCommitTs, txId)
- .thenApply(v -> txResult);
+ try {
+ txManager.cleanup(replicationGroupId,
enlistedPartitions, actualCommit, actualCommitTs, txId)
Review Comment:
We now do `releaseTxLocks` asynchronously, so another transaction may
stumble into locks of previous one that is already finished. Is it correct
behavior? Should it be documented?
##########
modules/table/src/main/java/org/apache/ignite/internal/table/distributed/replicator/DefaultTablePartitionReplicaProcessor.java:
##########
@@ -1669,17 +1772,23 @@ private <T> CompletableFuture<T> continueResolvingByPk(
* Appends an operation to prevent the race between commit/rollback and
the operation execution.
*
* @param txId Transaction id.
+ * @param commitPartitionId Commit partition id.
* @param requestType Request type.
* @param full {@code True} if a full transaction and can be immediately
committed.
* @param op Operation closure.
* @return A future object representing the result of the given operation.
*/
private <T> CompletableFuture<T> appendTxCommand(
UUID txId,
+ @Nullable ZonePartitionId commitPartitionId,
RequestType requestType,
boolean full,
Supplier<CompletableFuture<T>> op
) {
+ PendingTxContext txCtx = requestType.isRwRead() ? null :
+ pendingTransactions.computeIfAbsent(txId,
+ id -> new PendingTxContext(new CompletableFuture<>(),
commitPartitionId));
Review Comment:
commitPartitionId is not nullable parameter in this constructor, and may
cause exception on usage in
`TxStateResolutionTxStateResolutionParametersBuilder#build`
Why do you need the parameter to be nullable?
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]