This is an automated email from the ASF dual-hosted git repository.

ascherbakoff pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/ignite-3.git


The following commit(s) were added to refs/heads/main by this push:
     new 83b124b70ea IGNITE-25825 Async write intent cleanup (#7999)
83b124b70ea is described below

commit 83b124b70eaabca69f5a477c2294bfe0201aa4d3
Author: Aleksei Scherbakov <[email protected]>
AuthorDate: Thu Apr 23 13:43:40 2026 +0300

    IGNITE-25825 Async write intent cleanup (#7999)
---
 .../client/ItThinClientTransactionsTest.java       |   6 +-
 .../internal/client/tx/ClientTransaction.java      |  51 +++++--
 .../handlers/TxFinishReplicaRequestHandler.java    |  16 ++-
 .../ItPrimaryReplicaChoiceTest.java                |   3 +-
 .../internal/sql/engine/ItOrToUnionRuleTest.java   |   3 +
 .../ItTxAbstractDistributedTestSingleNode.java     |   4 +-
 ...xDistributedTestSingleNodeNoCleanupMessage.java |   1 +
 .../TablePartitionResourcesFactory.java            |   1 +
 .../DefaultTablePartitionReplicaProcessor.java     | 148 ++++++++++++++++++++-
 ...ePartitionReplicaProcessorIndexLockingTest.java |   1 +
 ...tionReplicaProcessorSortedIndexLockingTest.java |   1 +
 .../DefaultTablePartitionReplicaProcessorTest.java |   1 +
 .../ZonePartitionReplicaListenerTest.java          |   1 +
 .../storage/InternalTableEstimatedSizeTest.java    |   3 +-
 .../apache/ignite/distributed/ItTxTestCluster.java |   1 +
 .../ignite/internal/table/TxAbstractTest.java      |   8 +-
 .../table/impl/DummyInternalTableImpl.java         |   1 +
 .../internal/tx/impl/ReadWriteTransactionImpl.java |  12 +-
 18 files changed, 223 insertions(+), 39 deletions(-)

diff --git 
a/modules/client/src/integrationTest/java/org/apache/ignite/internal/client/ItThinClientTransactionsTest.java
 
b/modules/client/src/integrationTest/java/org/apache/ignite/internal/client/ItThinClientTransactionsTest.java
index 7d87e7da0ad..b0d6c8b0e79 100644
--- 
a/modules/client/src/integrationTest/java/org/apache/ignite/internal/client/ItThinClientTransactionsTest.java
+++ 
b/modules/client/src/integrationTest/java/org/apache/ignite/internal/client/ItThinClientTransactionsTest.java
@@ -481,7 +481,7 @@ public class ItThinClientTransactionsTest extends 
ItAbstractThinClientTest {
         Tuple key2 = tuples.get(1);
         assertThat(ctx.put.apply(client, tx, key2), 
willThrowWithCauseOrSuppressed(TransactionException.class, "Transaction is 
killed"));
 
-        assertThat(tx.commitAsync(), willSucceedFast());
+        assertThat(tx.commitAsync(), 
willThrowWithCauseOrSuppressed(TransactionException.class));
 
         // Validate lock possibility.
         assertThat(kvView.removeAllAsync(null, Arrays.asList(key, key2)), 
willSucceedFast());
@@ -594,7 +594,7 @@ public class ItThinClientTransactionsTest extends 
ItAbstractThinClientTest {
 
         await().atMost(3, TimeUnit.SECONDS).until(() -> 
tx.startedTx().killed());
 
-        assertThat(tx.commitAsync(), willSucceedFast());
+        assertThat(tx.commitAsync(), 
willThrowWithCauseOrSuppressed(TransactionException.class));
 
         // Validate lock possibility.
         assertThat(kvView.removeAllAsync(null, Arrays.asList(key0, key1, 
key2)), willSucceedFast());
@@ -1648,6 +1648,8 @@ public class ItThinClientTransactionsTest extends 
ItAbstractThinClientTest {
             assertThat(ctx.put.apply(client(), youngerTxProxy, key), 
willThrowWithCauseOrSuppressed(ctx.expectedErr));
         } else {
             assertThat(ctx.put.apply(client(), olderTxProxy, key2), 
willSucceedFast()); // Will invalidate younger tx.
+
+            await().atMost(2, TimeUnit.SECONDS).until(() -> 
youngerTxProxy.startedTx().killed());
             assertThat(youngerTxProxy.commitAsync(), 
willThrowWithCauseOrSuppressed(TransactionException.class));
         }
 
diff --git 
a/modules/client/src/main/java/org/apache/ignite/internal/client/tx/ClientTransaction.java
 
b/modules/client/src/main/java/org/apache/ignite/internal/client/tx/ClientTransaction.java
index b368b049fe2..78b30f3abaa 100644
--- 
a/modules/client/src/main/java/org/apache/ignite/internal/client/tx/ClientTransaction.java
+++ 
b/modules/client/src/main/java/org/apache/ignite/internal/client/tx/ClientTransaction.java
@@ -89,10 +89,20 @@ public class ClientTransaction implements Transaction {
     /** Node-local resource id for the Transaction. */
     private final long id;
 
-    /** The future used on repeated commit/rollback. */
+    /** The future is used on repeated direct (via API) commit/rollback. */
     @IgniteToStringExclude
     private volatile CompletableFuture<Void> finishFut;
 
+    /**
+     *  The future is used to track outcome of the implicit rollback operation 
(for example, a transaction is killed).
+     *  There is a scenario, when both futures are required.
+     *  1. implicit rollback is in progress
+     *  2. commit is called first time - an implicit rollback future outcome 
should be reported
+     *  3. commit is called second time - a completed future should be 
reported (this is a contract both for subsequent commit/rollback)
+     */
+    @IgniteToStringExclude
+    private volatile CompletableFuture<Void> implicitRollbackFut;
+
     /** State. */
     private final AtomicInteger state = new AtomicInteger(STATE_OPEN);
 
@@ -246,9 +256,13 @@ public class ClientTransaction implements Transaction {
         try {
             if (finishFut != null) {
                 return finishFut;
+            } else if (implicitRollbackFut == null) {
+                implicitRollbackFut = new CompletableFuture<>();
             } else {
-                finishFut = new CompletableFuture<>();
+                return implicitRollbackFut;
             }
+
+            setState(killed ? STATE_KILLED : STATE_ROLLED_BACK);
         } finally {
             enlistPartitionLock.writeLock().unlock();
         }
@@ -264,17 +278,19 @@ public class ClientTransaction implements Transaction {
 
         // It's safe to rollback proxy and direct parts of transactions 
concurrently.
         // Write intent resolution will ignore WIs from PENDING transactions.
-        return CompletableFuture.allOf(rollbackFut, 
sendDiscardRequests()).handle((r, e) -> {
-            setState(killed ? STATE_KILLED : STATE_ROLLED_BACK);
+        return CompletableFuture.allOf(rollbackFut, 
sendDiscardRequests()).whenComplete((r, e) -> {
             ch.inflights().erase(txId());
-            this.finishFut.complete(null);
-            return null;
+            TransactionException ex = exceptionForState(state.get(), this);
+
+            if (e != null) {
+                ex.addSuppressed(e);
+            }
+
+            this.implicitRollbackFut.completeExceptionally(ex);
         });
     }
 
     private CompletableFuture<Void> sendDiscardRequests() {
-        assert finishFut != null;
-
         if 
(!ch.protocolContext().isFeatureSupported(TX_DIRECT_MAPPING_SEND_DISCARD)) {
             return nullCompletedFuture();
         }
@@ -326,6 +342,9 @@ public class ClientTransaction implements Transaction {
         try {
             if (finishFut != null) {
                 return finishFut;
+            } else if (implicitRollbackFut != null) {
+                finishFut = nullCompletedFuture();
+                return implicitRollbackFut;
             } else {
                 finishFut = new CompletableFuture<>();
             }
@@ -363,14 +382,14 @@ public class ClientTransaction implements Transaction {
                 // Failed to commit for some reason, need to discard direct 
mappings.
                 Throwable cause = ExceptionUtils.unwrapCause(e);
 
-                sendDiscardRequests().handle((r, e0) -> {
-                    setState(cause instanceof ClientTransactionKilledException 
? STATE_KILLED : STATE_ROLLED_BACK);
+                setState(cause instanceof ClientTransactionKilledException ? 
STATE_KILLED : STATE_ROLLED_BACK);
+
+                sendDiscardRequests().whenComplete((r, e0) -> {
                     ch.inflights().erase(txId());
                     this.finishFut.complete(null);
-                    return null;
                 });
 
-                return null;
+                throw sneakyThrow(e); // Fail commit future.
             }
 
             setState(STATE_COMMITTED);
@@ -420,9 +439,14 @@ public class ClientTransaction implements Transaction {
         try {
             if (finishFut != null) {
                 return finishFut;
+            } else if (implicitRollbackFut != null) {
+                finishFut = nullCompletedFuture();
+                return implicitRollbackFut;
             } else {
                 finishFut = new CompletableFuture<>();
             }
+
+            setState(STATE_ROLLED_BACK);
         } finally {
             enlistPartitionLock.writeLock().unlock();
         }
@@ -437,7 +461,6 @@ public class ClientTransaction implements Transaction {
         }, r -> null);
 
         mainFinishFut.handle((res, e) -> {
-            setState(STATE_ROLLED_BACK);
             ch.inflights().erase(txId());
             this.finishFut.complete(null);
             return null;
@@ -596,7 +619,7 @@ public class ClientTransaction implements Transaction {
     }
 
     private void checkEnlistPossible() {
-        if (finishFut != null) {
+        if (finishFut != null || implicitRollbackFut != null) {
             throw exceptionForState(state.get(), this);
         }
     }
diff --git 
a/modules/partition-replicator/src/main/java/org/apache/ignite/internal/partition/replicator/handlers/TxFinishReplicaRequestHandler.java
 
b/modules/partition-replicator/src/main/java/org/apache/ignite/internal/partition/replicator/handlers/TxFinishReplicaRequestHandler.java
index 0366339f2d9..2167cf8f6ff 100644
--- 
a/modules/partition-replicator/src/main/java/org/apache/ignite/internal/partition/replicator/handlers/TxFinishReplicaRequestHandler.java
+++ 
b/modules/partition-replicator/src/main/java/org/apache/ignite/internal/partition/replicator/handlers/TxFinishReplicaRequestHandler.java
@@ -224,12 +224,22 @@ public class TxFinishReplicaRequestHandler {
                 .map(entry -> new EnlistedPartitionGroup(entry.getKey(), 
entry.getValue().tableIds()))
                 .collect(toList());
         return finishTransaction(enlistedPartitionGroups, txId, commit, 
commitTimestamp)
-                .thenCompose(txResult -> {
+                .thenApply(txResult -> {
                     boolean actualCommit = txResult.transactionState() == 
COMMITTED;
                     HybridTimestamp actualCommitTs = 
txResult.commitTimestamp();
 
-                    return txManager.cleanup(replicationGroupId, 
enlistedPartitions, actualCommit, actualCommitTs, txId)
-                            .thenApply(v -> txResult);
+                    try {
+                        txManager.cleanup(replicationGroupId, 
enlistedPartitions, actualCommit, actualCommitTs, txId)
+                                .whenComplete((r, e) -> {
+                                    if (e != null) {
+                                        LOG.warn("Failed to cleanup a 
transaction [id={}]", txId, e);
+                                    }
+                                });
+                    } catch (Exception e) {
+                        LOG.warn("Failed to cleanup a transaction [id={}]", 
txId, e);
+                    }
+
+                    return txResult;
                 });
     }
 
diff --git 
a/modules/placement-driver/src/integrationTest/java/org/apache/ignite/internal/placementdriver/ItPrimaryReplicaChoiceTest.java
 
b/modules/placement-driver/src/integrationTest/java/org/apache/ignite/internal/placementdriver/ItPrimaryReplicaChoiceTest.java
index 0dd1905754a..b6621253226 100644
--- 
a/modules/placement-driver/src/integrationTest/java/org/apache/ignite/internal/placementdriver/ItPrimaryReplicaChoiceTest.java
+++ 
b/modules/placement-driver/src/integrationTest/java/org/apache/ignite/internal/placementdriver/ItPrimaryReplicaChoiceTest.java
@@ -72,6 +72,7 @@ import 
org.apache.ignite.internal.testframework.IgniteTestUtils;
 import org.apache.ignite.internal.testframework.flow.TestFlowUtils;
 import org.apache.ignite.internal.tx.InternalTransaction;
 import org.apache.ignite.internal.tx.impl.ReadWriteTransactionImpl;
+import org.apache.ignite.internal.util.CollectionUtils;
 import org.apache.ignite.internal.wrapper.Wrappers;
 import org.apache.ignite.table.Table;
 import org.apache.ignite.table.Tuple;
@@ -281,7 +282,7 @@ public class ItPrimaryReplicaChoiceTest extends 
ClusterPerTestIntegrationTest {
 
         rwTx.rollback();
 
-        
assertFalse(primaryIgnite.txManager().lockManager().locks(rwTx.id()).hasNext());
+        assertTrue(waitForCondition(() -> 
CollectionUtils.nullOrEmpty(primaryIgnite.txManager().lockManager().locks(rwTx.id())),
 2000));
         assertEquals(3, partitionStorage.pendingCursors() + 
hashIdxStorage.pendingCursors() + sortedIdxStorage.pendingCursors());
     }
 
diff --git 
a/modules/sql-engine/src/integrationTest/java/org/apache/ignite/internal/sql/engine/ItOrToUnionRuleTest.java
 
b/modules/sql-engine/src/integrationTest/java/org/apache/ignite/internal/sql/engine/ItOrToUnionRuleTest.java
index f59d95f8215..867047cc7a5 100644
--- 
a/modules/sql-engine/src/integrationTest/java/org/apache/ignite/internal/sql/engine/ItOrToUnionRuleTest.java
+++ 
b/modules/sql-engine/src/integrationTest/java/org/apache/ignite/internal/sql/engine/ItOrToUnionRuleTest.java
@@ -86,6 +86,9 @@ public class ItOrToUnionRuleTest extends 
BaseSqlIntegrationTest {
                 {23, null, 0, null, 41, null},
         });
 
+        // TODO IGNITE-28578
+        assertQuery("select count(*) from products").returns(23L).check();
+
         gatherStatistics();
     }
 
diff --git 
a/modules/table/src/integrationTest/java/org/apache/ignite/distributed/ItTxAbstractDistributedTestSingleNode.java
 
b/modules/table/src/integrationTest/java/org/apache/ignite/distributed/ItTxAbstractDistributedTestSingleNode.java
index a5de447c11e..2048fcb073a 100644
--- 
a/modules/table/src/integrationTest/java/org/apache/ignite/distributed/ItTxAbstractDistributedTestSingleNode.java
+++ 
b/modules/table/src/integrationTest/java/org/apache/ignite/distributed/ItTxAbstractDistributedTestSingleNode.java
@@ -24,6 +24,7 @@ import static 
org.apache.ignite.internal.testframework.matchers.CompletableFutur
 import static 
org.apache.ignite.internal.testframework.matchers.CompletableFutureMatcher.willSucceedFast;
 import static 
org.apache.ignite.lang.ErrorGroups.Transactions.TX_ALREADY_FINISHED_ERR;
 import static 
org.apache.ignite.lang.ErrorGroups.Transactions.TX_ALREADY_FINISHED_WITH_EXCEPTION_ERR;
+import static org.awaitility.Awaitility.await;
 import static org.hamcrest.MatcherAssert.assertThat;
 import static org.hamcrest.Matchers.anyOf;
 import static org.hamcrest.Matchers.is;
@@ -31,7 +32,6 @@ import static org.junit.jupiter.api.Assertions.assertFalse;
 import static org.junit.jupiter.api.Assertions.assertNotNull;
 import static org.junit.jupiter.api.Assertions.assertNull;
 import static org.junit.jupiter.api.Assertions.assertThrows;
-import static org.junit.jupiter.api.Assertions.assertTrue;
 
 import java.util.concurrent.CompletableFuture;
 import java.util.concurrent.CountDownLatch;
@@ -150,7 +150,7 @@ public abstract class ItTxAbstractDistributedTestSingleNode 
extends TxAbstractTe
         assertThat(futFinishes, willSucceedFast());
         assertThat(futEnlists, willSucceedFast());
 
-        
assertTrue(CollectionUtils.nullOrEmpty(txManager(accounts).lockManager().locks(txId)));
+        await().atMost(2, TimeUnit.SECONDS).until(() -> 
CollectionUtils.nullOrEmpty(txManager(accounts).lockManager().locks(txId)));
     }
 
     /**
diff --git 
a/modules/table/src/integrationTest/java/org/apache/ignite/distributed/ItTxDistributedTestSingleNodeNoCleanupMessage.java
 
b/modules/table/src/integrationTest/java/org/apache/ignite/distributed/ItTxDistributedTestSingleNodeNoCleanupMessage.java
index a68006cc8e0..c5bd9bf2765 100644
--- 
a/modules/table/src/integrationTest/java/org/apache/ignite/distributed/ItTxDistributedTestSingleNodeNoCleanupMessage.java
+++ 
b/modules/table/src/integrationTest/java/org/apache/ignite/distributed/ItTxDistributedTestSingleNodeNoCleanupMessage.java
@@ -206,6 +206,7 @@ public class ItTxDistributedTestSingleNodeNoCleanupMessage 
extends TxAbstractTes
                         txManager,
                         txManager.lockManager(),
                         Runnable::run,
+                        Runnable::run,
                         replicationGroupId,
                         tableId,
                         indexesLockers,
diff --git 
a/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/TablePartitionResourcesFactory.java
 
b/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/TablePartitionResourcesFactory.java
index a541025d562..68c00522e1d 100644
--- 
a/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/TablePartitionResourcesFactory.java
+++ 
b/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/TablePartitionResourcesFactory.java
@@ -285,6 +285,7 @@ class TablePartitionResourcesFactory {
                 txManager,
                 lockManager,
                 scanRequestExecutor,
+                partitionOperationsExecutor,
                 replicationGroupId,
                 table.tableId(),
                 table.indexesLockers(partitionIndex),
diff --git 
a/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/replicator/DefaultTablePartitionReplicaProcessor.java
 
b/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/replicator/DefaultTablePartitionReplicaProcessor.java
index 96ede9bae0f..127aca54b76 100644
--- 
a/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/replicator/DefaultTablePartitionReplicaProcessor.java
+++ 
b/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/replicator/DefaultTablePartitionReplicaProcessor.java
@@ -52,6 +52,7 @@ import static 
org.apache.ignite.internal.tx.impl.TxStateResolutionParameters.txS
 import static org.apache.ignite.internal.util.CollectionUtils.nullOrEmpty;
 import static org.apache.ignite.internal.util.CollectionUtils.view;
 import static org.apache.ignite.internal.util.CompletableFutures.allOfToList;
+import static org.apache.ignite.internal.util.CompletableFutures.copyStateTo;
 import static 
org.apache.ignite.internal.util.CompletableFutures.emptyCollectionCompletedFuture;
 import static 
org.apache.ignite.internal.util.CompletableFutures.emptyListCompletedFuture;
 import static 
org.apache.ignite.internal.util.CompletableFutures.isCompletedSuccessfully;
@@ -83,10 +84,15 @@ import java.util.Set;
 import java.util.UUID;
 import java.util.concurrent.CompletableFuture;
 import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.ConcurrentNavigableMap;
+import java.util.concurrent.ConcurrentSkipListMap;
 import java.util.concurrent.Executor;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.TimeoutException;
 import java.util.concurrent.atomic.AtomicBoolean;
 import java.util.concurrent.atomic.AtomicReference;
 import java.util.function.BiConsumer;
+import java.util.function.Function;
 import java.util.function.Predicate;
 import java.util.function.Supplier;
 import org.apache.ignite.internal.binarytuple.BinaryTuple;
@@ -199,8 +205,10 @@ import org.apache.ignite.internal.tx.LockManager;
 import org.apache.ignite.internal.tx.LockMode;
 import 
org.apache.ignite.internal.tx.OutdatedReadOnlyTransactionInternalException;
 import 
org.apache.ignite.internal.tx.PrimaryReplicaChangeDuringWriteIntentResolutionException;
+import org.apache.ignite.internal.tx.TransactionIds;
 import org.apache.ignite.internal.tx.TransactionMeta;
 import org.apache.ignite.internal.tx.TxManager;
+import org.apache.ignite.internal.tx.TxPriority;
 import org.apache.ignite.internal.tx.TxState;
 import org.apache.ignite.internal.tx.TxStateMeta;
 import org.apache.ignite.internal.tx.UpdateCommandResult;
@@ -254,6 +262,8 @@ public class DefaultTablePartitionReplicaProcessor 
implements TablePartitionRepl
     @SuppressWarnings("unused") // We use it as a placeholder of a 
documentation which can be linked using # and @see.
     private static final Object INTERNAL_DOC_PLACEHOLDER = null;
 
+    private static final int WAIT_FOR_CLEANUP_TIMEOUT_MILLIS = 500;
+
     /** Logger. */
     private static final IgniteLogger LOG = 
Loggers.forClass(DefaultTablePartitionReplicaProcessor.class);
 
@@ -304,6 +314,8 @@ public class DefaultTablePartitionReplicaProcessor 
implements TablePartitionRepl
     /** Runs async scan tasks for effective tail recursion execution (avoid 
deep recursive calls). */
     private final Executor scanRequestExecutor;
 
+    private final Executor partitionOperationsExecutor;
+
     private final Supplier<Int2ObjectMap<IndexLocker>> indexesLockers;
 
     /** Used to handle race between concurrent rollback and enlist. */
@@ -312,6 +324,9 @@ public class DefaultTablePartitionReplicaProcessor 
implements TablePartitionRepl
     /** Cleanup futures. */
     private final ConcurrentHashMap<RowId, CompletableFuture<?>> rowCleanupMap 
= new ConcurrentHashMap<>();
 
+    /** Pending transactions. */
+    private final ConcurrentNavigableMap<UUID, PendingTxContext> 
pendingTransactions = new ConcurrentSkipListMap<>();
+
     private final SchemaCompatibilityValidator schemaCompatValidator;
 
     private final SchemaSyncService schemaSyncService;
@@ -350,6 +365,8 @@ public class DefaultTablePartitionReplicaProcessor 
implements TablePartitionRepl
      * @param raftCommandRunner Raft client.
      * @param txManager Transaction manager.
      * @param lockManager Lock manager.
+     * @param scanRequestExecutor Scan executor.
+     * @param partitionOperationsExecutor Partition executor.
      * @param replicationGroupId Replication group id.
      * @param tableId Table id.
      * @param indexesLockers Index lock helper objects.
@@ -375,6 +392,7 @@ public class DefaultTablePartitionReplicaProcessor 
implements TablePartitionRepl
             TxManager txManager,
             LockManager lockManager,
             Executor scanRequestExecutor,
+            Executor partitionOperationsExecutor,
             ZonePartitionId replicationGroupId,
             int tableId,
             Supplier<Int2ObjectMap<IndexLocker>> indexesLockers,
@@ -401,6 +419,7 @@ public class DefaultTablePartitionReplicaProcessor 
implements TablePartitionRepl
         this.txManager = txManager;
         this.lockManager = lockManager;
         this.scanRequestExecutor = scanRequestExecutor;
+        this.partitionOperationsExecutor = partitionOperationsExecutor;
         this.indexesLockers = indexesLockers;
         this.pkIndexStorage = pkIndexStorage;
         this.secondaryIndexStorages = secondaryIndexStorages;
@@ -560,7 +579,8 @@ public class DefaultTablePartitionReplicaProcessor 
implements TablePartitionRepl
         }
 
         if (request instanceof GetEstimatedSizeRequest) {
-            return processGetEstimatedSizeRequest();
+            return resolvePendingTransactions(((GetEstimatedSizeRequest) 
request).timestamp(), safeTime.current(),
+                    unused -> completedFuture(mvDataStorage.estimatedSize()));
         }
 
         HybridTimestamp opTs = 
tableAwareReplicaRequestPreProcessor.getOperationTimestamp(request);
@@ -569,10 +589,6 @@ public class DefaultTablePartitionReplicaProcessor 
implements TablePartitionRepl
         return processOperationRequestWithTxOperationManagementLogic(request, 
replicaPrimacy, opTsIfDirectRo);
     }
 
-    private CompletableFuture<Long> processGetEstimatedSizeRequest() {
-        return completedFuture(mvDataStorage.estimatedSize());
-    }
-
     private void replicaTouch(UUID txId, UUID coordinatorId, ZonePartitionId 
commitPartitionId, @Nullable String txLabel) {
         txManager.updateTxMeta(txId, old -> builder(old, PENDING)
                 .txCoordinatorId(coordinatorId)
@@ -581,6 +597,80 @@ public class DefaultTablePartitionReplicaProcessor 
implements TablePartitionRepl
                 .build());
     }
 
+    private <T> CompletableFuture<T> resolvePendingTransactions(
+            HybridTimestamp requestTime,
+            HybridTimestamp currentSafeTime,
+            Function<HybridTimestamp, CompletableFuture<T>> action
+    ) {
+        // Wait for currentSafeTime >= requestTime to avoid out-of-order 
transactions arrival.
+        if (currentSafeTime.compareTo(requestTime) < 0) {
+            return safeTime.waitFor(requestTime)
+                    .thenComposeAsync(
+                            unused -> resolvePendingTransactions(requestTime, 
safeTime.current(), action),
+                            partitionOperationsExecutor);
+        }
+
+        assert currentSafeTime.compareTo(requestTime) >= 0 : "currentSafeTime 
< lowerBoundTimestamp";
+        assert currentSafeTime.compareTo(safeTime.current()) <= 0 : 
"currentSafeTime > safeTime";
+
+        // Stable committed snapshot is ensured after resolving pending 
transactions state.
+        UUID upperBoundTxId = TransactionIds.transactionId(currentSafeTime, 
Integer.MAX_VALUE, TxPriority.NORMAL);
+        ConcurrentNavigableMap<UUID, PendingTxContext> txToWait = 
pendingTransactions.headMap(upperBoundTxId, true);
+
+        if (!txToWait.isEmpty()) {
+            List<CompletableFuture<?>> futs = null;
+
+            for (Map.Entry<UUID, PendingTxContext> entry : 
txToWait.entrySet()) {
+                if (!entry.getValue().cleanupFut.isDone()) {
+                    futs = futs == null ? new ArrayList<>() : futs;
+                    futs.add(resolveTransactionState(entry.getKey(), 
entry.getValue(), currentSafeTime));
+                }
+            }
+
+            if (futs != null) {
+                return allOf(futs.toArray(CompletableFuture[]::new))
+                        .thenComposeAsync(unused -> 
action.apply(currentSafeTime), partitionOperationsExecutor);
+            }
+        }
+
+        return action.apply(currentSafeTime);
+    }
+
+    private CompletableFuture<Void> resolveTransactionState(UUID txId, 
PendingTxContext txCtx, HybridTimestamp observableTimestamp) {
+        CompletableFuture<Void> resFut = new CompletableFuture<>();
+
+        txCtx.cleanupFut.whenComplete(copyStateTo(resFut));
+
+        return resFut.orTimeout(WAIT_FOR_CLEANUP_TIMEOUT_MILLIS, 
TimeUnit.MILLISECONDS).handle((ignored, e) -> {
+            if (e == null) {
+                return CompletableFutures.<Void>nullCompletedFuture();
+            }
+
+            if (e instanceof TimeoutException) {
+                // Transaction was not cleaned up in time.
+                // Send tx state request to coordinator to bump commit 
timestamp for active txn beyond safe timestamp.
+                return 
transactionStateResolver.resolveTxState(txStateResolutionParameters()
+                                .txId(txId)
+                                .commitGroupId(txCtx.commitPartId)
+                                .readTimestamp(observableTimestamp)
+                                .build())
+                        .thenCompose(r -> {
+                            if (r.txState() == PENDING) {
+                                // Safe to ignore this transaction, it will be 
committed with a higher timestamp.
+                                return nullCompletedFuture();
+                            }
+
+                            // Wait for original future relying on durable 
finish to do cleanup in case of failures.
+                            return txCtx.cleanupFut;
+                        });
+            }
+
+            sneakyThrow(e);
+
+            return null;
+        }).thenCompose(Function.identity());
+    }
+
     private static void setDelayedAckProcessor(@Nullable ReplicaResult result, 
@Nullable BiConsumer<Object, Throwable> proc) {
         if (result != null) {
             result.delayedAckProcessor = proc;
@@ -621,6 +711,7 @@ public class DefaultTablePartitionReplicaProcessor 
implements TablePartitionRepl
     private CompletableFuture<?> 
processRwSingleRowRequest(ReadWriteSingleRowReplicaRequest req, ReplicaPrimacy 
primacy) {
         return appendTxCommand(
                 req.transactionId(),
+                req.commitPartitionId().asZonePartitionId(),
                 req.requestType(),
                 req.full(),
                 () -> processSingleEntryAction(req, 
primacy.leaseStartTime()).whenComplete(
@@ -631,6 +722,7 @@ public class DefaultTablePartitionReplicaProcessor 
implements TablePartitionRepl
     private CompletableFuture<?> 
processRwSingleRowPkRequest(ReadWriteSingleRowPkReplicaRequest req, 
ReplicaPrimacy primacy) {
         return appendTxCommand(
                 req.transactionId(),
+                req.commitPartitionId().asZonePartitionId(),
                 req.requestType(),
                 req.full(),
                 () -> processSingleEntryAction(req, 
primacy.leaseStartTime()).whenComplete(
@@ -641,6 +733,7 @@ public class DefaultTablePartitionReplicaProcessor 
implements TablePartitionRepl
     private CompletableFuture<?> 
processRwMultiRowRequest(ReadWriteMultiRowReplicaRequest req, ReplicaPrimacy 
primacy) {
         return appendTxCommand(
                 req.transactionId(),
+                req.commitPartitionId().asZonePartitionId(),
                 req.requestType(),
                 req.full(),
                 () -> processMultiEntryAction(req, 
primacy.leaseStartTime()).whenComplete(
@@ -651,6 +744,7 @@ public class DefaultTablePartitionReplicaProcessor 
implements TablePartitionRepl
     private CompletableFuture<?> 
processRwMultiRowPkRequest(ReadWriteMultiRowPkReplicaRequest req, 
ReplicaPrimacy primacy) {
         return appendTxCommand(
                 req.transactionId(),
+                req.commitPartitionId().asZonePartitionId(),
                 req.requestType(),
                 req.full(),
                 () -> processMultiEntryAction(req, 
primacy.leaseStartTime()).whenComplete(
@@ -661,6 +755,7 @@ public class DefaultTablePartitionReplicaProcessor 
implements TablePartitionRepl
     private CompletableFuture<?> 
processRwSwapRowRequest(ReadWriteSwapRowReplicaRequest req, ReplicaPrimacy 
primacy) {
         return appendTxCommand(
                 req.transactionId(),
+                req.commitPartitionId().asZonePartitionId(),
                 req.requestType(),
                 req.full(),
                 () -> processTwoEntriesAction(req, 
primacy.leaseStartTime()).whenComplete(
@@ -677,7 +772,7 @@ public class DefaultTablePartitionReplicaProcessor 
implements TablePartitionRepl
         replicaTouch(req.transactionId(), req.coordinatorId(), 
req.commitPartitionId().asZonePartitionId(), req.txLabel());
 
         // Implicit RW scan can be committed locally on a last batch or error.
-        return appendTxCommand(req.transactionId(), RW_SCAN, false, () -> 
processScanRetrieveBatchAction(req))
+        return appendTxCommand(req.transactionId(), null, RW_SCAN, false, () 
-> processScanRetrieveBatchAction(req))
                 .thenCompose(rows -> {
                     if (allElementsAreNull(rows)) {
                         return completedFuture(rows);
@@ -1534,6 +1629,14 @@ public class DefaultTablePartitionReplicaProcessor 
implements TablePartitionRepl
                         applyWriteIntentSwitchCommandLocally(request);
                     }
 
+                    partitionOperationsExecutor.execute(() -> {
+                        PendingTxContext txCtx = 
pendingTransactions.remove(request.txId());
+
+                        if (txCtx != null) {
+                            txCtx.cleanupFut.complete(null);
+                        }
+                    });
+
                     return new ReplicaResult(res, null);
                 });
     }
@@ -1669,6 +1772,7 @@ public class DefaultTablePartitionReplicaProcessor 
implements TablePartitionRepl
      * Appends an operation to prevent the race between commit/rollback and 
the operation execution.
      *
      * @param txId Transaction id.
+     * @param commitPartitionId Commit partition id.
      * @param requestType Request type.
      * @param full {@code True} if a full transaction and can be immediately 
committed.
      * @param op Operation closure.
@@ -1676,10 +1780,15 @@ public class DefaultTablePartitionReplicaProcessor 
implements TablePartitionRepl
      */
     private <T> CompletableFuture<T> appendTxCommand(
             UUID txId,
+            @Nullable ZonePartitionId commitPartitionId,
             RequestType requestType,
             boolean full,
             Supplier<CompletableFuture<T>> op
     ) {
+        PendingTxContext txCtx = requestType.isRwRead() ? null :
+                pendingTransactions.computeIfAbsent(txId,
+                        id -> new PendingTxContext(new CompletableFuture<>(), 
commitPartitionId));
+
         if (full) {
             AtomicReference<CompletableFuture<T>> futRef = new 
AtomicReference<>();
 
@@ -1690,6 +1799,14 @@ public class DefaultTablePartitionReplicaProcessor 
implements TablePartitionRepl
                 releaseTxLocks(txId);
                 // Drop volatile state.
                 txManager.updateTxMeta(txId, ignored -> null);
+
+                partitionOperationsExecutor.execute(() -> {
+                    // We don't care about errors for pending tx purposes.
+                    if (!requestType.isRwRead()) {
+                        txCtx.cleanupFut.complete(null);
+                        pendingTransactions.remove(txId);
+                    }
+                });
             });
         }
 
@@ -1714,6 +1831,9 @@ public class DefaultTablePartitionReplicaProcessor 
implements TablePartitionRepl
             Throwable publicCause = isFinishedDueToError ? cause : null;
             Integer causeErrorCode = txStateMeta == null ? null : 
txStateMeta.lastExceptionErrorCode();
 
+            // Reduce heap usage.
+            pendingTransactions.remove(txId);
+
             return failedFuture(new TransactionException(
                     finishedTransactionErrorCode(isFinishedDueToTimeout, 
isFinishedDueToError),
                     format(finishedTransactionErrorMessage(
@@ -4085,4 +4205,20 @@ public class DefaultTablePartitionReplicaProcessor 
implements TablePartitionRepl
             this.transactionMeta = transactionMeta;
         }
     }
+
+    /**
+     * Represents waiting for cleanup transactions context.
+     */
+    private static class PendingTxContext {
+        final CompletableFuture<Void> cleanupFut;
+        final ZonePartitionId commitPartId;
+
+        PendingTxContext(CompletableFuture<Void> cleanupFut, ZonePartitionId 
commitPartId) {
+            assert cleanupFut != null;
+            assert commitPartId != null;
+
+            this.cleanupFut = cleanupFut;
+            this.commitPartId = commitPartId;
+        }
+    }
 }
diff --git 
a/modules/table/src/test/java/org/apache/ignite/internal/table/distributed/replication/DefaultTablePartitionReplicaProcessorIndexLockingTest.java
 
b/modules/table/src/test/java/org/apache/ignite/internal/table/distributed/replication/DefaultTablePartitionReplicaProcessorIndexLockingTest.java
index a5760940cb0..340dcb33ced 100644
--- 
a/modules/table/src/test/java/org/apache/ignite/internal/table/distributed/replication/DefaultTablePartitionReplicaProcessorIndexLockingTest.java
+++ 
b/modules/table/src/test/java/org/apache/ignite/internal/table/distributed/replication/DefaultTablePartitionReplicaProcessorIndexLockingTest.java
@@ -264,6 +264,7 @@ public class 
DefaultTablePartitionReplicaProcessorIndexLockingTest extends Ignit
                 newTxManager(),
                 LOCK_MANAGER,
                 Runnable::run,
+                Runnable::run,
                 new ZonePartitionId(ZONE_ID, PART_ID),
                 TABLE_ID,
                 () -> Int2ObjectMap.ofEntries(
diff --git 
a/modules/table/src/test/java/org/apache/ignite/internal/table/distributed/replication/DefaultTablePartitionReplicaProcessorSortedIndexLockingTest.java
 
b/modules/table/src/test/java/org/apache/ignite/internal/table/distributed/replication/DefaultTablePartitionReplicaProcessorSortedIndexLockingTest.java
index 4c429dc7d2b..39b2eb07231 100644
--- 
a/modules/table/src/test/java/org/apache/ignite/internal/table/distributed/replication/DefaultTablePartitionReplicaProcessorSortedIndexLockingTest.java
+++ 
b/modules/table/src/test/java/org/apache/ignite/internal/table/distributed/replication/DefaultTablePartitionReplicaProcessorSortedIndexLockingTest.java
@@ -234,6 +234,7 @@ public class 
DefaultTablePartitionReplicaProcessorSortedIndexLockingTest extends
                 newTxManager(),
                 LOCK_MANAGER,
                 Runnable::run,
+                Runnable::run,
                 new ZonePartitionId(ZONE_ID, PART_ID),
                 TABLE_ID,
                 () -> Int2ObjectMaps.singleton(pkLocker.id(), pkLocker),
diff --git 
a/modules/table/src/test/java/org/apache/ignite/internal/table/distributed/replication/DefaultTablePartitionReplicaProcessorTest.java
 
b/modules/table/src/test/java/org/apache/ignite/internal/table/distributed/replication/DefaultTablePartitionReplicaProcessorTest.java
index db7cdb543aa..453235e9db2 100644
--- 
a/modules/table/src/test/java/org/apache/ignite/internal/table/distributed/replication/DefaultTablePartitionReplicaProcessorTest.java
+++ 
b/modules/table/src/test/java/org/apache/ignite/internal/table/distributed/replication/DefaultTablePartitionReplicaProcessorTest.java
@@ -663,6 +663,7 @@ public class DefaultTablePartitionReplicaProcessorTest 
extends IgniteAbstractTes
                 txManager,
                 lockManager,
                 Runnable::run,
+                Runnable::run,
                 new ZonePartitionId(tableDescriptor.zoneId(), PART_ID),
                 TABLE_ID,
                 () -> Int2ObjectMap.ofEntries(
diff --git 
a/modules/table/src/test/java/org/apache/ignite/internal/table/distributed/replication/ZonePartitionReplicaListenerTest.java
 
b/modules/table/src/test/java/org/apache/ignite/internal/table/distributed/replication/ZonePartitionReplicaListenerTest.java
index 202bf8cdf8e..58d8b06a7df 100644
--- 
a/modules/table/src/test/java/org/apache/ignite/internal/table/distributed/replication/ZonePartitionReplicaListenerTest.java
+++ 
b/modules/table/src/test/java/org/apache/ignite/internal/table/distributed/replication/ZonePartitionReplicaListenerTest.java
@@ -668,6 +668,7 @@ public class ZonePartitionReplicaListenerTest extends 
IgniteAbstractTest {
                 txManager,
                 lockManager,
                 Runnable::run,
+                Runnable::run,
                 new ZonePartitionId(tableDescriptor.zoneId(), PART_ID),
                 TABLE_ID,
                 () -> Int2ObjectMap.ofEntries(
diff --git 
a/modules/table/src/test/java/org/apache/ignite/internal/table/distributed/storage/InternalTableEstimatedSizeTest.java
 
b/modules/table/src/test/java/org/apache/ignite/internal/table/distributed/storage/InternalTableEstimatedSizeTest.java
index cb0754e9567..5579a2b67a9 100644
--- 
a/modules/table/src/test/java/org/apache/ignite/internal/table/distributed/storage/InternalTableEstimatedSizeTest.java
+++ 
b/modules/table/src/test/java/org/apache/ignite/internal/table/distributed/storage/InternalTableEstimatedSizeTest.java
@@ -327,13 +327,14 @@ public class InternalTableEstimatedSizeTest extends 
BaseIgniteAbstractTest {
                 txManager,
                 lockManager,
                 ForkJoinPool.commonPool(),
+                ForkJoinPool.commonPool(),
                 new ZonePartitionId(ZONE_ID, partId),
                 TABLE_ID,
                 Int2ObjectMaps::emptyMap,
                 new Lazy<>(() -> null),
                 Int2ObjectMaps::emptyMap,
                 clockService,
-                new 
PendingComparableValuesTracker<>(HybridTimestamp.MIN_VALUE),
+                new 
PendingComparableValuesTracker<>(HybridTimestamp.MAX_VALUE),
                 transactionStateResolver,
                 storageUpdateHandler,
                 validationSchemasSource,
diff --git 
a/modules/table/src/testFixtures/java/org/apache/ignite/distributed/ItTxTestCluster.java
 
b/modules/table/src/testFixtures/java/org/apache/ignite/distributed/ItTxTestCluster.java
index 007b0f5cea4..e0fecc0761a 100644
--- 
a/modules/table/src/testFixtures/java/org/apache/ignite/distributed/ItTxTestCluster.java
+++ 
b/modules/table/src/testFixtures/java/org/apache/ignite/distributed/ItTxTestCluster.java
@@ -1110,6 +1110,7 @@ public class ItTxTestCluster {
                 txManager,
                 txManager.lockManager(),
                 Runnable::run,
+                Runnable::run,
                 replicationGroupId,
                 tableId,
                 indexesLockers,
diff --git 
a/modules/table/src/testFixtures/java/org/apache/ignite/internal/table/TxAbstractTest.java
 
b/modules/table/src/testFixtures/java/org/apache/ignite/internal/table/TxAbstractTest.java
index ecea6c51af1..d57044f6adf 100644
--- 
a/modules/table/src/testFixtures/java/org/apache/ignite/internal/table/TxAbstractTest.java
+++ 
b/modules/table/src/testFixtures/java/org/apache/ignite/internal/table/TxAbstractTest.java
@@ -1893,7 +1893,7 @@ public abstract class TxAbstractTest extends 
TxInfrastructureTest {
     }
 
     @Test
-    public void testTransactionAlreadyCommitted() {
+    public void testTransactionAlreadyCommitted() throws InterruptedException {
         testTransactionAlreadyFinished(true, true, (transaction, uuid) -> {
             transaction.commit();
 
@@ -1902,7 +1902,7 @@ public abstract class TxAbstractTest extends 
TxInfrastructureTest {
     }
 
     @Test
-    public void testTransactionAlreadyRolledback() {
+    public void testTransactionAlreadyRolledback() throws InterruptedException 
{
         testTransactionAlreadyFinished(false, true, (transaction, uuid) -> {
             transaction.rollback();
 
@@ -2177,7 +2177,7 @@ public abstract class TxAbstractTest extends 
TxInfrastructureTest {
             boolean commit,
             boolean checkLocks,
             BiConsumer<Transaction, UUID> finisher
-    ) {
+    ) throws InterruptedException {
         Transaction tx = igniteTransactions.begin();
 
         var txId = ((ReadWriteTransactionImpl) tx).id();
@@ -2204,7 +2204,7 @@ public abstract class TxAbstractTest extends 
TxInfrastructureTest {
         assertThrowsTxFinishedException(() -> accountsRv.upsert(tx, 
makeValue(2, 300.)));
 
         if (checkLocks) {
-            
assertTrue(CollectionUtils.nullOrEmpty(txManager(accounts).lockManager().locks(txId)));
+            assertTrue(waitForCondition(() -> 
CollectionUtils.nullOrEmpty(txManager(accounts).lockManager().locks(txId)), 
2000));
         }
 
         if (commit) {
diff --git 
a/modules/table/src/testFixtures/java/org/apache/ignite/internal/table/impl/DummyInternalTableImpl.java
 
b/modules/table/src/testFixtures/java/org/apache/ignite/internal/table/impl/DummyInternalTableImpl.java
index de75440356a..3097c49cf0b 100644
--- 
a/modules/table/src/testFixtures/java/org/apache/ignite/internal/table/impl/DummyInternalTableImpl.java
+++ 
b/modules/table/src/testFixtures/java/org/apache/ignite/internal/table/impl/DummyInternalTableImpl.java
@@ -486,6 +486,7 @@ public class DummyInternalTableImpl extends 
InternalTableImpl {
                 this.txManager,
                 this.txManager.lockManager(),
                 Runnable::run,
+                Runnable::run,
                 zonePartitionId,
                 tableId,
                 () -> Int2ObjectMaps.singleton(pkLocker.id(), pkLocker),
diff --git 
a/modules/transactions/src/main/java/org/apache/ignite/internal/tx/impl/ReadWriteTransactionImpl.java
 
b/modules/transactions/src/main/java/org/apache/ignite/internal/tx/impl/ReadWriteTransactionImpl.java
index 277da8a2a92..6973594f3ef 100644
--- 
a/modules/transactions/src/main/java/org/apache/ignite/internal/tx/impl/ReadWriteTransactionImpl.java
+++ 
b/modules/transactions/src/main/java/org/apache/ignite/internal/tx/impl/ReadWriteTransactionImpl.java
@@ -235,7 +235,7 @@ public class ReadWriteTransactionImpl extends 
IgniteAbstractTransactionImpl {
      * @param commit Commit flag.
      * @param executionTimestamp The timestamp is the time when the 
transaction is applied to the remote node.
      * @param full Full state transaction marker.
-     * @param isComplete The flag is true if the transaction is completed 
through the public API, false for {@link this#kill()} invocation.
+     * @param isFinishedDirectly {@code True} if the transaction is completed 
through the public API, false for {@link #kill()}.
      * @param finishReason Optional finish reason (for example, timeout). Must 
be {@code null} for commit.
      * @return The future.
      */
@@ -243,7 +243,7 @@ public class ReadWriteTransactionImpl extends 
IgniteAbstractTransactionImpl {
             boolean commit,
             @Nullable HybridTimestamp executionTimestamp,
             boolean full,
-            boolean isComplete,
+            boolean isFinishedDirectly,
             @Nullable Throwable finishReason
     ) {
         enlistPartitionLock.writeLock().lock();
@@ -251,7 +251,7 @@ public class ReadWriteTransactionImpl extends 
IgniteAbstractTransactionImpl {
         try {
             if (finishFuture == null) {
                 if (killed) {
-                    if (isComplete) {
+                    if (isFinishedDirectly) {
                         // An attempt to finish a killed transaction.
                         finishFuture = nullCompletedFuture();
 
@@ -266,7 +266,7 @@ public class ReadWriteTransactionImpl extends 
IgniteAbstractTransactionImpl {
                     CompletableFuture<Void> finishFutureInternal =
                             txManager.finishFull(observableTsTracker, id(), 
executionTimestamp, commit, finishReason);
 
-                    if (isComplete) {
+                    if (isFinishedDirectly) {
                         finishFuture = finishFutureInternal.handle((unused, 
throwable) -> null);
                         this.timeoutExceeded = 
isFinishedDueToTimeout(finishReason);
                     } else {
@@ -278,7 +278,7 @@ public class ReadWriteTransactionImpl extends 
IgniteAbstractTransactionImpl {
 
                     return finishFutureInternal;
                 } else {
-                    killed = !isComplete;
+                    killed = !isFinishedDirectly;
 
                     CompletableFuture<Void> finishFutureInternal = 
txManager.finish(
                             observableTsTracker,
@@ -291,7 +291,7 @@ public class ReadWriteTransactionImpl extends 
IgniteAbstractTransactionImpl {
                             id()
                     );
 
-                    if (isComplete) {
+                    if (isFinishedDirectly) {
                         finishFuture = finishFutureInternal.handle((unused, 
throwable) -> null);
                         this.timeoutExceeded = 
isFinishedDueToTimeout(finishReason);
                     } else {


Reply via email to