This is an automated email from the ASF dual-hosted git repository.

sanpwc pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/ignite-3.git


The following commit(s) were added to refs/heads/main by this push:
     new 20359d3a12 IGNITE-21540 Handle lock exception for transaction 
operations (#3462)
20359d3a12 is described below

commit 20359d3a12dc686ab03959b814eb4458d25633c6
Author: Vladislav Pyatkov <[email protected]>
AuthorDate: Fri Mar 22 13:24:42 2024 +0300

    IGNITE-21540 Handle lock exception for transaction operations (#3462)
---
 .../runner/app/ItIgniteNodeRestartTest.java        |   2 +
 .../org/apache/ignite/internal/app/IgniteImpl.java |   1 +
 .../exec/rel/TableScanNodeExecutionTest.java       |   4 +-
 .../ItTxAbstractDistributedTestSingleNode.java     |  44 ++++
 .../rebalance/ItRebalanceDistributedTest.java      |   2 +
 .../ignite/internal/table/ItColocationTest.java    |   4 +-
 .../internal/table/ItOperationRetryTest.java       | 132 ++++++++++++
 .../internal/table/ItTransactionRecoveryTest.java  |   7 +-
 .../internal/table/distributed/TableManager.java   |  16 +-
 .../replicator/PartitionReplicaListener.java       |  93 ++++++--
 .../distributed/storage/InternalTableImpl.java     | 234 ++++++++++++++-------
 .../distributed/TableManagerRecoveryTest.java      |   4 +
 .../table/distributed/TableManagerTest.java        |   5 +
 .../distributed/storage/InternalTableImplTest.java |   8 +-
 .../apache/ignite/distributed/ItTxTestCluster.java |   4 +-
 .../table/impl/DummyInternalTableImpl.java         |   4 +-
 .../TransactionConfigurationSchema.java            |  10 +
 17 files changed, 475 insertions(+), 99 deletions(-)

diff --git 
a/modules/runner/src/integrationTest/java/org/apache/ignite/internal/runner/app/ItIgniteNodeRestartTest.java
 
b/modules/runner/src/integrationTest/java/org/apache/ignite/internal/runner/app/ItIgniteNodeRestartTest.java
index a536f060d4..5e8d364097 100644
--- 
a/modules/runner/src/integrationTest/java/org/apache/ignite/internal/runner/app/ItIgniteNodeRestartTest.java
+++ 
b/modules/runner/src/integrationTest/java/org/apache/ignite/internal/runner/app/ItIgniteNodeRestartTest.java
@@ -503,6 +503,7 @@ public class ItIgniteNodeRestartTest extends 
BaseIgniteRestartTest {
         );
 
         GcConfiguration gcConfig = 
clusterConfigRegistry.getConfiguration(GcConfiguration.KEY);
+        TransactionConfiguration txConfiguration = 
clusterConfigRegistry.getConfiguration(TransactionConfiguration.KEY);
 
         var clockWaiter = new ClockWaiter(name, hybridClock);
 
@@ -551,6 +552,7 @@ public class ItIgniteNodeRestartTest extends 
BaseIgniteRestartTest {
                 name,
                 registry,
                 gcConfig,
+                txConfiguration,
                 storageUpdateConfiguration,
                 messagingServiceReturningToStorageOperationsPool,
                 clusterSvc.topologyService(),
diff --git 
a/modules/runner/src/main/java/org/apache/ignite/internal/app/IgniteImpl.java 
b/modules/runner/src/main/java/org/apache/ignite/internal/app/IgniteImpl.java
index 8ea348e858..a91ff35fb0 100644
--- 
a/modules/runner/src/main/java/org/apache/ignite/internal/app/IgniteImpl.java
+++ 
b/modules/runner/src/main/java/org/apache/ignite/internal/app/IgniteImpl.java
@@ -711,6 +711,7 @@ public class IgniteImpl implements Ignite {
                 name,
                 registry,
                 gcConfig,
+                txConfig,
                 storageUpdateConfiguration,
                 messagingServiceReturningToStorageOperationsPool,
                 clusterSvc.topologyService(),
diff --git 
a/modules/sql-engine/src/test/java/org/apache/ignite/internal/sql/engine/exec/rel/TableScanNodeExecutionTest.java
 
b/modules/sql-engine/src/test/java/org/apache/ignite/internal/sql/engine/exec/rel/TableScanNodeExecutionTest.java
index 6aa22be45f..04ef25807c 100644
--- 
a/modules/sql-engine/src/test/java/org/apache/ignite/internal/sql/engine/exec/rel/TableScanNodeExecutionTest.java
+++ 
b/modules/sql-engine/src/test/java/org/apache/ignite/internal/sql/engine/exec/rel/TableScanNodeExecutionTest.java
@@ -253,7 +253,9 @@ public class TableScanNodeExecutionTest extends 
AbstractExecutionTest<Object[]>
                             Int2ObjectMaps.singleton(0, 
mock(RaftGroupService.class)),
                             new 
SingleClusterNodeResolver(mock(ClusterNode.class))
                     ),
-                    mock(TransactionInflights.class)
+                    mock(TransactionInflights.class),
+                    3_000,
+                    0
             );
             this.dataAmount = dataAmount;
 
diff --git 
a/modules/table/src/integrationTest/java/org/apache/ignite/distributed/ItTxAbstractDistributedTestSingleNode.java
 
b/modules/table/src/integrationTest/java/org/apache/ignite/distributed/ItTxAbstractDistributedTestSingleNode.java
index b9dce5b0cc..e3688ca88e 100644
--- 
a/modules/table/src/integrationTest/java/org/apache/ignite/distributed/ItTxAbstractDistributedTestSingleNode.java
+++ 
b/modules/table/src/integrationTest/java/org/apache/ignite/distributed/ItTxAbstractDistributedTestSingleNode.java
@@ -18,12 +18,19 @@
 package org.apache.ignite.distributed;
 
 import static 
org.apache.ignite.internal.testframework.IgniteTestUtils.assertThrowsWithCode;
+import static 
org.apache.ignite.internal.testframework.IgniteTestUtils.runAsync;
 import static 
org.apache.ignite.internal.testframework.IgniteTestUtils.runMultiThreadedAsync;
+import static 
org.apache.ignite.internal.testframework.matchers.CompletableFutureExceptionMatcher.willThrow;
+import static 
org.apache.ignite.internal.testframework.matchers.CompletableFutureMatcher.willCompleteSuccessfully;
 import static 
org.apache.ignite.internal.testframework.matchers.CompletableFutureMatcher.willSucceedFast;
 import static 
org.apache.ignite.lang.ErrorGroups.Transactions.TX_ALREADY_FINISHED_ERR;
 import static org.hamcrest.MatcherAssert.assertThat;
+import static org.junit.jupiter.api.Assertions.assertFalse;
+import static org.junit.jupiter.api.Assertions.assertNotNull;
+import static org.junit.jupiter.api.Assertions.assertNull;
 import static org.junit.jupiter.api.Assertions.assertTrue;
 
+import java.util.concurrent.CompletableFuture;
 import java.util.concurrent.CountDownLatch;
 import java.util.concurrent.CyclicBarrier;
 import java.util.concurrent.ThreadLocalRandom;
@@ -34,6 +41,7 @@ import org.apache.ignite.tx.Transaction;
 import org.apache.ignite.tx.TransactionException;
 import org.apache.ignite.tx.TransactionOptions;
 import org.junit.jupiter.api.RepeatedTest;
+import org.junit.jupiter.api.Test;
 import org.junit.jupiter.api.TestInfo;
 
 /**
@@ -166,6 +174,42 @@ public abstract class 
ItTxAbstractDistributedTestSingleNode extends TxAbstractTe
         assertThat(futFinishes, willSucceedFast());
     }
 
+    @Test
+    public void testImplicitTransactionRetry() {
+        var rv = accounts.recordView();
+
+        Transaction tx = igniteTransactions.begin();
+
+        assertNull(rv.get(tx, makeKey(1)));
+
+        CompletableFuture<Void> implicitOpFut = runAsync(() -> rv.upsert(null, 
makeValue(1, 1.)));
+
+        assertFalse(implicitOpFut.isDone());
+
+        tx.commit();
+
+        assertThat(implicitOpFut, willCompleteSuccessfully());
+
+        assertNotNull(rv.get(null, makeKey(1)));
+    }
+
+    @Test
+    public void testImplicitTransactionTimeout() {
+        var rv = accounts.recordView();
+
+        Transaction tx = igniteTransactions.begin();
+
+        assertNull(rv.get(tx, makeKey(1)));
+
+        CompletableFuture<Void> implicitOpFut = runAsync(() -> rv.upsert(null, 
makeValue(1, 1.)));
+
+        assertFalse(implicitOpFut.isDone());
+
+        assertThat(implicitOpFut, willThrow(TransactionException.class));
+
+        assertNull(rv.get(null, makeKey(1)));
+    }
+
     /**
      * Finish the tx.
      *
diff --git 
a/modules/table/src/integrationTest/java/org/apache/ignite/internal/rebalance/ItRebalanceDistributedTest.java
 
b/modules/table/src/integrationTest/java/org/apache/ignite/internal/rebalance/ItRebalanceDistributedTest.java
index c1e7d69085..b9417b5f18 100644
--- 
a/modules/table/src/integrationTest/java/org/apache/ignite/internal/rebalance/ItRebalanceDistributedTest.java
+++ 
b/modules/table/src/integrationTest/java/org/apache/ignite/internal/rebalance/ItRebalanceDistributedTest.java
@@ -1135,6 +1135,7 @@ public class ItRebalanceDistributedTest extends 
BaseIgniteAbstractTest {
                     
metaStorageManager.registerRevisionUpdateListener(function::apply);
 
             GcConfiguration gcConfig = 
clusterConfigRegistry.getConfiguration(GcConfiguration.KEY);
+            TransactionConfiguration txConfig = 
clusterConfigRegistry.getConfiguration(TransactionConfiguration.KEY);
 
             DataStorageModules dataStorageModules = new 
DataStorageModules(List.of(
                     new PersistentPageMemoryDataStorageModule(),
@@ -1193,6 +1194,7 @@ public class ItRebalanceDistributedTest extends 
BaseIgniteAbstractTest {
                     name,
                     registry,
                     gcConfig,
+                    txConfig,
                     storageUpdateConfiguration,
                     clusterService.messagingService(),
                     clusterService.topologyService(),
diff --git 
a/modules/table/src/integrationTest/java/org/apache/ignite/internal/table/ItColocationTest.java
 
b/modules/table/src/integrationTest/java/org/apache/ignite/internal/table/ItColocationTest.java
index d599540c39..4ab770b890 100644
--- 
a/modules/table/src/integrationTest/java/org/apache/ignite/internal/table/ItColocationTest.java
+++ 
b/modules/table/src/integrationTest/java/org/apache/ignite/internal/table/ItColocationTest.java
@@ -301,7 +301,9 @@ public class ItColocationTest extends 
BaseIgniteAbstractTest {
                 observableTimestampTracker,
                 new TestPlacementDriver(clusterNode),
                 new TableRaftServiceImpl("PUBLIC.TEST", PARTS, partRafts, new 
SingleClusterNodeResolver(clusterNode)),
-                transactionInflights
+                transactionInflights,
+                3_000,
+                0
         );
     }
 
diff --git 
a/modules/table/src/integrationTest/java/org/apache/ignite/internal/table/ItOperationRetryTest.java
 
b/modules/table/src/integrationTest/java/org/apache/ignite/internal/table/ItOperationRetryTest.java
new file mode 100644
index 0000000000..f329a5767e
--- /dev/null
+++ 
b/modules/table/src/integrationTest/java/org/apache/ignite/internal/table/ItOperationRetryTest.java
@@ -0,0 +1,132 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.table;
+
+import static java.util.concurrent.TimeUnit.SECONDS;
+import static org.apache.ignite.internal.SessionUtils.executeUpdate;
+import static 
org.apache.ignite.internal.testframework.matchers.CompletableFutureMatcher.willCompleteSuccessfully;
+import static org.hamcrest.MatcherAssert.assertThat;
+import static org.junit.jupiter.api.Assertions.assertEquals;
+
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.function.Predicate;
+import java.util.stream.IntStream;
+import org.apache.ignite.internal.ClusterPerTestIntegrationTest;
+import org.apache.ignite.internal.app.IgniteImpl;
+import org.apache.ignite.internal.network.DefaultMessagingService;
+import org.apache.ignite.internal.placementdriver.ReplicaMeta;
+import org.apache.ignite.internal.replicator.ReplicationGroupId;
+import org.apache.ignite.internal.replicator.TablePartitionId;
+import org.apache.ignite.internal.replicator.message.ReplicaResponse;
+import org.apache.ignite.table.RecordView;
+import org.apache.ignite.table.Tuple;
+import org.apache.ignite.tx.Transaction;
+import org.junit.jupiter.api.BeforeEach;
+import org.junit.jupiter.api.Test;
+import org.junit.jupiter.api.TestInfo;
+
+/**
+ * Tests of transaction operation retry.
+ */
+public class ItOperationRetryTest extends ClusterPerTestIntegrationTest {
+    /** Table name. */
+    private static final String TABLE_NAME = "test_table";
+
+    private static final int PART_ID = 0;
+
+    @BeforeEach
+    @Override
+    public void setup(TestInfo testInfo) throws Exception {
+        super.setup(testInfo);
+
+        String zoneSql = "create zone test_zone with partitions=1, replicas=3";
+        String sql = "create table " + TABLE_NAME + " (key int primary key, 
val varchar(20)) with primary_zone='TEST_ZONE'";
+
+        cluster.doInSession(0, session -> {
+            executeUpdate(zoneSql, session);
+            executeUpdate(sql, session);
+        });
+    }
+
+    @Test
+    public void testLockExceptionRetry() {
+        TableImpl tbl = (TableImpl) node(0).tables().table(TABLE_NAME);
+
+        var tblReplicationGrp = new TablePartitionId(tbl.tableId(), PART_ID);
+
+        String leaseholder = waitAndGetPrimaryReplica(node(0), 
tblReplicationGrp).getLeaseholder();
+
+        IgniteImpl leaseholderNode = findNodeByName(leaseholder);
+        IgniteImpl otherNode = findNode(0, initialNodes(), ignite -> 
!leaseholderNode.equals(ignite.name()));
+
+        log.info("Transactions are executed from a non-primary node [node={}, 
primary={}].", otherNode.name(), leaseholderNode.name());
+
+        RecordView<Tuple> view = 
otherNode.tables().table(TABLE_NAME).recordView();
+
+        Transaction tx1 = otherNode.transactions().begin();
+        Transaction tx2 = otherNode.transactions().begin();
+
+        view.get(tx1, Tuple.create().set("key", 1));
+
+        DefaultMessagingService messagingService = (DefaultMessagingService) 
leaseholderNode.clusterService().messagingService();
+
+        AtomicBoolean lockHold = new AtomicBoolean(true);
+
+        messagingService.dropMessages((nodeName, networkMessage) -> {
+            if (nodeName.equals(otherNode.name()) && networkMessage instanceof 
ReplicaResponse && lockHold.compareAndSet(true, false)) {
+                log.info("Exceptional response on lock acquisition 
[resp={}].", networkMessage.getClass().getSimpleName());
+
+                tx1.commit();
+            }
+
+            return false;
+        });
+
+        view.upsert(tx2, Tuple.create().set("key", 1).set("val", "new value"));
+
+        tx2.commit();
+
+        assertEquals("new value", view.get(null, Tuple.create().set("key", 
1)).value("val"));
+    }
+
+    private IgniteImpl findNodeByName(String leaseholder) {
+        return findNode(0, initialNodes(), n -> leaseholder.equals(n.name()));
+    }
+
+    private IgniteImpl findNode(int startRange, int endRange, 
Predicate<IgniteImpl> filter) {
+        return IntStream.range(startRange, endRange)
+                .mapToObj(this::node)
+                .filter(filter::test)
+                .findFirst()
+                .get();
+    }
+
+    private ReplicaMeta waitAndGetPrimaryReplica(IgniteImpl node, 
ReplicationGroupId tblReplicationGrp) {
+        CompletableFuture<ReplicaMeta> primaryReplicaFut = 
node.placementDriver().awaitPrimaryReplica(
+                tblReplicationGrp,
+                node.clock().now(),
+                10,
+                SECONDS
+        );
+
+        assertThat(primaryReplicaFut, willCompleteSuccessfully());
+
+        return primaryReplicaFut.join();
+    }
+}
diff --git 
a/modules/table/src/integrationTest/java/org/apache/ignite/internal/table/ItTransactionRecoveryTest.java
 
b/modules/table/src/integrationTest/java/org/apache/ignite/internal/table/ItTransactionRecoveryTest.java
index e0c2877c48..16f057613e 100644
--- 
a/modules/table/src/integrationTest/java/org/apache/ignite/internal/table/ItTransactionRecoveryTest.java
+++ 
b/modules/table/src/integrationTest/java/org/apache/ignite/internal/table/ItTransactionRecoveryTest.java
@@ -128,6 +128,7 @@ public class ItTransactionRecoveryTest extends 
ClusterPerTestIntegrationTest {
         builder.clusterConfiguration("{\n"
                 + "  \"transaction\": {\n"
                 + "  \"abandonedCheckTs\": 600000\n"
+                + "  \"attemptsObtainLock\": 0\n"
                 + "  }\n"
                 + "}\n");
     }
@@ -745,7 +746,7 @@ public class ItTransactionRecoveryTest extends 
ClusterPerTestIntegrationTest {
     public void testPrimaryFailureWhileInflightInProgressAfterFirstResponse() 
throws Exception {
         TableImpl tbl = (TableImpl) node(0).tables().table(TABLE_NAME);
 
-        var tblReplicationGrp = new TablePartitionId(tbl.tableId(), 0);
+        var tblReplicationGrp = new TablePartitionId(tbl.tableId(), PART_ID);
 
         String leaseholder = waitAndGetLeaseholder(node(0), tblReplicationGrp);
 
@@ -961,9 +962,7 @@ public class ItTransactionRecoveryTest extends 
ClusterPerTestIntegrationTest {
     public void testCursorsClosedAfterTxClose() throws Exception {
         TableImpl tbl = (TableImpl) node(0).tables().table(TABLE_NAME);
 
-        int partId = 0;
-
-        var tblReplicationGrp = new TablePartitionId(tbl.tableId(), partId);
+        var tblReplicationGrp = new TablePartitionId(tbl.tableId(), PART_ID);
 
         String leaseholder = waitAndGetPrimaryReplica(node(0), 
tblReplicationGrp).getLeaseholder();
 
diff --git 
a/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/TableManager.java
 
b/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/TableManager.java
index 0785fa5ce1..26e39c20da 100644
--- 
a/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/TableManager.java
+++ 
b/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/TableManager.java
@@ -198,6 +198,7 @@ import org.apache.ignite.internal.tostring.S;
 import org.apache.ignite.internal.tx.HybridTimestampTracker;
 import org.apache.ignite.internal.tx.LockManager;
 import org.apache.ignite.internal.tx.TxManager;
+import org.apache.ignite.internal.tx.configuration.TransactionConfiguration;
 import org.apache.ignite.internal.tx.impl.RemotelyTriggeredResourceRegistry;
 import org.apache.ignite.internal.tx.impl.TransactionInflights;
 import org.apache.ignite.internal.tx.impl.TxMessageSender;
@@ -402,12 +403,18 @@ public class TableManager implements 
IgniteTablesInternal, IgniteComponent {
 
     private final TransactionInflights transactionInflights;
 
+    private final TransactionConfiguration txCfg;
+
+    private long implicitTransactionTimeout;
+    private int attemptsObtainLock;
+
     /**
      * Creates a new table manager.
      *
      * @param nodeName Node name.
      * @param registry Registry for versioned values.
      * @param gcConfig Garbage collector configuration.
+     * @param txCfg Transaction configuration.
      * @param storageUpdateConfig Storage update handler configuration.
      * @param raftMgr Raft manager.
      * @param replicaMgr Replica manager.
@@ -435,6 +442,7 @@ public class TableManager implements IgniteTablesInternal, 
IgniteComponent {
             String nodeName,
             Consumer<LongFunction<CompletableFuture<?>>> registry,
             GcConfiguration gcConfig,
+            TransactionConfiguration txCfg,
             StorageUpdateConfiguration storageUpdateConfig,
             MessagingService messagingService,
             TopologyService topologyService,
@@ -491,6 +499,7 @@ public class TableManager implements IgniteTablesInternal, 
IgniteComponent {
         this.lowWatermark = lowWatermark;
         this.asyncContinuationExecutor = asyncContinuationExecutor;
         this.transactionInflights = transactionInflights;
+        this.txCfg = txCfg;
 
         this.executorInclinedSchemaSyncService = new 
ExecutorInclinedSchemaSyncService(schemaSyncService, 
partitionOperationsExecutor);
         this.executorInclinedPlacementDriver = new 
ExecutorInclinedPlacementDriver(placementDriver, partitionOperationsExecutor);
@@ -603,6 +612,9 @@ public class TableManager implements IgniteTablesInternal, 
IgniteComponent {
 
             partitionReplicatorNodeRecovery.start();
 
+            implicitTransactionTimeout = 
txCfg.implicitTransactionTimeout().value();
+            attemptsObtainLock = txCfg.attemptsObtainLock().value();
+
             return nullCompletedFuture();
         });
     }
@@ -1299,7 +1311,9 @@ public class TableManager implements 
IgniteTablesInternal, IgniteComponent {
                 observableTimestampTracker,
                 executorInclinedPlacementDriver,
                 tableRaftService,
-                transactionInflights
+                transactionInflights,
+                implicitTransactionTimeout,
+                attemptsObtainLock
         );
 
         var table = new TableImpl(
diff --git 
a/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/replicator/PartitionReplicaListener.java
 
b/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/replicator/PartitionReplicaListener.java
index ab1610be37..bb0ff8d86e 100644
--- 
a/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/replicator/PartitionReplicaListener.java
+++ 
b/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/replicator/PartitionReplicaListener.java
@@ -433,7 +433,7 @@ public class PartitionReplicaListener implements 
ReplicaListener {
         return validateTableExistence(request, opTsIfDirectRo)
                 .thenCompose(unused -> validateSchemaMatch(request, 
opTsIfDirectRo))
                 .thenCompose(unused -> waitForSchemasBeforeReading(request, 
opTsIfDirectRo))
-                .thenCompose(unused -> 
processOperationRequestWithTxRwCounter(request, isPrimary, opTsIfDirectRo));
+                .thenCompose(unused -> 
processOperationRequestWithTxRwCounter(senderId, request, isPrimary, 
opTsIfDirectRo));
     }
 
     /**
@@ -603,6 +603,7 @@ public class PartitionReplicaListener implements 
ReplicaListener {
     }
 
     private CompletableFuture<?> processOperationRequest(
+            String senderId,
             ReplicaRequest request,
             @Nullable Boolean isPrimary,
             @Nullable HybridTimestamp opStartTsIfDirectRo
@@ -610,23 +611,33 @@ public class PartitionReplicaListener implements 
ReplicaListener {
         if (request instanceof ReadWriteSingleRowReplicaRequest) {
             var req = (ReadWriteSingleRowReplicaRequest) request;
 
-            return appendTxCommand(req.transactionId(), req.requestType(), 
req.full(), () -> processSingleEntryAction(req));
+            var opId = new OperationId(senderId, req.timestampLong());
+
+            return appendTxCommand(req.transactionId(), opId, 
req.requestType(), req.full(), () -> processSingleEntryAction(req));
         } else if (request instanceof ReadWriteSingleRowPkReplicaRequest) {
             var req = (ReadWriteSingleRowPkReplicaRequest) request;
 
-            return appendTxCommand(req.transactionId(), req.requestType(), 
req.full(), () -> processSingleEntryAction(req));
+            var opId = new OperationId(senderId, req.timestampLong());
+
+            return appendTxCommand(req.transactionId(), opId, 
req.requestType(), req.full(), () -> processSingleEntryAction(req));
         } else if (request instanceof ReadWriteMultiRowReplicaRequest) {
             var req = (ReadWriteMultiRowReplicaRequest) request;
 
-            return appendTxCommand(req.transactionId(), req.requestType(), 
req.full(), () -> processMultiEntryAction(req));
+            var opId = new OperationId(senderId, req.timestampLong());
+
+            return appendTxCommand(req.transactionId(), opId, 
req.requestType(), req.full(), () -> processMultiEntryAction(req));
         } else if (request instanceof ReadWriteMultiRowPkReplicaRequest) {
             var req = (ReadWriteMultiRowPkReplicaRequest) request;
 
-            return appendTxCommand(req.transactionId(), req.requestType(), 
req.full(), () -> processMultiEntryAction(req));
+            var opId = new OperationId(senderId, req.timestampLong());
+
+            return appendTxCommand(req.transactionId(), opId, 
req.requestType(), req.full(), () -> processMultiEntryAction(req));
         } else if (request instanceof ReadWriteSwapRowReplicaRequest) {
             var req = (ReadWriteSwapRowReplicaRequest) request;
 
-            return appendTxCommand(req.transactionId(), req.requestType(), 
req.full(), () -> processTwoEntriesAction(req));
+            var opId = new OperationId(senderId, req.timestampLong());
+
+            return appendTxCommand(req.transactionId(), opId, 
req.requestType(), req.full(), () -> processTwoEntriesAction(req));
         } else if (request instanceof 
ReadWriteScanRetrieveBatchReplicaRequest) {
             var req = (ReadWriteScanRetrieveBatchReplicaRequest) request;
 
@@ -642,8 +653,10 @@ public class PartitionReplicaListener implements 
ReplicaListener {
                     null
             ));
 
+            var opId = new OperationId(senderId, req.timestampLong());
+
             // Implicit RW scan can be committed locally on a last batch or 
error.
-            return appendTxCommand(req.transactionId(), RequestType.RW_SCAN, 
false, () -> processScanRetrieveBatchAction(req))
+            return appendTxCommand(req.transactionId(), opId, 
RequestType.RW_SCAN, false, () -> processScanRetrieveBatchAction(req))
                     .thenCompose(rows -> {
                         if (allElementsAreNull(rows)) {
                             return completedFuture(rows);
@@ -1737,9 +1750,9 @@ public class PartitionReplicaListener implements 
ReplicaListener {
 
             txOps.futures.forEach((opType, futures) -> {
                 if (opType.isRwRead()) {
-                    txReadFutures.addAll(futures);
+                    txReadFutures.addAll(futures.values());
                 } else {
-                    txUpdateFutures.addAll(futures);
+                    txUpdateFutures.addAll(futures.values());
                 }
             });
 
@@ -1877,12 +1890,19 @@ public class PartitionReplicaListener implements 
ReplicaListener {
      * Appends an operation to prevent the race between commit/rollback and 
the operation execution.
      *
      * @param txId Transaction id.
+     * @param opId Operation id.
      * @param cmdType Command type.
      * @param full {@code True} if a full transaction and can be immediately 
committed.
      * @param op Operation closure.
      * @return A future object representing the result of the given operation.
      */
-    private <T> CompletableFuture<T> appendTxCommand(UUID txId, RequestType 
cmdType, boolean full, Supplier<CompletableFuture<T>> op) {
+    private <T> CompletableFuture<T> appendTxCommand(
+            UUID txId,
+            OperationId opId,
+            RequestType cmdType,
+            boolean full,
+            Supplier<CompletableFuture<T>> op
+    ) {
         if (full) {
             return op.get().whenComplete((v, th) -> {
                 // Fast unlock.
@@ -1908,7 +1928,7 @@ public class PartitionReplicaListener implements 
ReplicaListener {
                 txOps = new TxCleanupReadyFutureList();
             }
 
-            txOps.futures.computeIfAbsent(cmdType, type -> new 
ArrayList<>()).add(cleanupReadyFut);
+            txOps.futures.computeIfAbsent(cmdType, type -> new 
HashMap<>()).put(opId, cleanupReadyFut);
 
             return txOps;
         });
@@ -3732,7 +3752,7 @@ public class PartitionReplicaListener implements 
ReplicaListener {
         /**
          * Operation type is mapped operation futures.
          */
-        final Map<RequestType, List<CompletableFuture<?>>> futures = new 
EnumMap<>(RequestType.class);
+        final Map<RequestType, Map<OperationId, CompletableFuture<?>>> futures 
= new EnumMap<>(RequestType.class);
     }
 
     @Override
@@ -3822,6 +3842,7 @@ public class PartitionReplicaListener implements 
ReplicaListener {
     }
 
     private CompletableFuture<?> processOperationRequestWithTxRwCounter(
+            String senderId,
             ReplicaRequest request,
             @Nullable Boolean isPrimary,
             @Nullable HybridTimestamp opStartTsIfDirectRo
@@ -3836,7 +3857,7 @@ public class PartitionReplicaListener implements 
ReplicaListener {
             }
         }
 
-        return processOperationRequest(request, isPrimary, opStartTsIfDirectRo)
+        return processOperationRequest(senderId, request, isPrimary, 
opStartTsIfDirectRo)
                 .whenComplete((unused, throwable) -> {
                     if (request instanceof ReadWriteReplicaRequest) {
                         txRwOperationTracker.decrementOperationCount(
@@ -3922,4 +3943,50 @@ public class PartitionReplicaListener implements 
ReplicaListener {
     private @Nullable BinaryRow upgrage(@Nullable BinaryRow source, int 
targetSchemaVersion) {
         return source == null ? null : new BinaryRowUpgrader(schemaRegistry, 
targetSchemaVersion).upgrade(source);
     }
+
+    /**
+     * Operation unique identifier.
+     */
+    private static class OperationId {
+        /** Operation node initiator id. */
+        private String initiatorId;
+
+        /** Timestamp. */
+        private long ts;
+
+        /**
+         * The constructor.
+         *
+         * @param initiatorId Sender node id.
+         * @param ts Timestamp.
+         */
+        public OperationId(String initiatorId, long ts) {
+            this.initiatorId = initiatorId;
+            this.ts = ts;
+        }
+
+        @Override
+        public boolean equals(Object o) {
+            if (this == o) {
+                return true;
+            }
+            if (o == null || getClass() != o.getClass()) {
+                return false;
+            }
+
+            OperationId that = (OperationId) o;
+
+            if (ts != that.ts) {
+                return false;
+            }
+            return initiatorId.equals(that.initiatorId);
+        }
+
+        @Override
+        public int hashCode() {
+            int result = initiatorId.hashCode();
+            result = 31 * result + (int) (ts ^ (ts >>> 32));
+            return result;
+        }
+    }
 }
diff --git 
a/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/storage/InternalTableImpl.java
 
b/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/storage/InternalTableImpl.java
index 68446acc84..9505e2c3b4 100644
--- 
a/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/storage/InternalTableImpl.java
+++ 
b/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/storage/InternalTableImpl.java
@@ -29,6 +29,7 @@ import static 
org.apache.ignite.internal.table.distributed.storage.RowBatch.allR
 import static 
org.apache.ignite.internal.util.CompletableFutures.emptyListCompletedFuture;
 import static 
org.apache.ignite.internal.util.CompletableFutures.nullCompletedFuture;
 import static org.apache.ignite.internal.util.ExceptionUtils.withCause;
+import static 
org.apache.ignite.internal.util.FastTimestamps.coarseCurrentTimeMillis;
 import static org.apache.ignite.lang.ErrorGroups.Common.INTERNAL_ERR;
 import static 
org.apache.ignite.lang.ErrorGroups.Replicator.REPLICA_UNAVAILABLE_ERR;
 import static org.apache.ignite.lang.ErrorGroups.Transactions.ACQUIRE_LOCK_ERR;
@@ -175,6 +176,12 @@ public class InternalTableImpl implements InternalTable {
     /** Table raft service. */
     private final TableRaftServiceImpl tableRaftService;
 
+    /** Implicit transaction timeout. */
+    private final long implicitTransactionTimeout;
+
+    /** Attempts to take lock. */
+    private final int attemptsObtainLock;
+
     /**
      * Constructor.
      *
@@ -190,6 +197,8 @@ public class InternalTableImpl implements InternalTable {
      * @param placementDriver Placement driver.
      * @param tableRaftService Table raft service.
      * @param transactionInflights Transaction inflights.
+     * @param implicitTransactionTimeout Implicit transaction timeout.
+     * @param attemptsObtainLock Attempts to take lock.
      */
     public InternalTableImpl(
             String tableName,
@@ -204,7 +213,9 @@ public class InternalTableImpl implements InternalTable {
             HybridTimestampTracker observableTimestampTracker,
             PlacementDriver placementDriver,
             TableRaftServiceImpl tableRaftService,
-            TransactionInflights transactionInflights
+            TransactionInflights transactionInflights,
+            long implicitTransactionTimeout,
+            int attemptsObtainLock
     ) {
         this.tableName = tableName;
         this.tableId = tableId;
@@ -220,6 +231,8 @@ public class InternalTableImpl implements InternalTable {
         this.placementDriver = placementDriver;
         this.tableRaftService = tableRaftService;
         this.transactionInflights = transactionInflights;
+        this.implicitTransactionTimeout = implicitTransactionTimeout;
+        this.attemptsObtainLock = attemptsObtainLock;
     }
 
     /** {@inheritDoc} */
@@ -265,7 +278,25 @@ public class InternalTableImpl implements InternalTable {
      * @param tx The transaction, not null if explicit.
      * @param fac Replica requests factory.
      * @param noWriteChecker Used to handle operations producing no updates.
-     * @param retryOnLockConflict {@code True} to retry on lock conflict.
+     * @return The future.
+     */
+    private <R> CompletableFuture<R> enlistInTx(
+            BinaryRowEx row,
+            @Nullable InternalTransaction tx,
+            IgniteTriFunction<InternalTransaction, ReplicationGroupId, Long, 
ReplicaRequest> fac,
+            BiPredicate<R, ReplicaRequest> noWriteChecker
+    ) {
+        return enlistInTx(row, tx, fac, noWriteChecker, null);
+    }
+
+    /**
+     * Enlists a single row into a transaction.
+     *
+     * @param row The row.
+     * @param tx The transaction, not null if explicit.
+     * @param fac Replica requests factory.
+     * @param noWriteChecker Used to handle operations producing no updates.
+     * @param txStartTs Transaction start time or {@code null}. This parameter 
is used only for retry.
      * @return The future.
      */
     private <R> CompletableFuture<R> enlistInTx(
@@ -273,7 +304,7 @@ public class InternalTableImpl implements InternalTable {
             @Nullable InternalTransaction tx,
             IgniteTriFunction<InternalTransaction, ReplicationGroupId, Long, 
ReplicaRequest> fac,
             BiPredicate<R, ReplicaRequest> noWriteChecker,
-            boolean retryOnLockConflict
+            @Nullable Long txStartTs
     ) {
         // Check whether proposed tx is read-only. Complete future 
exceptionally if true.
         // Attempting to enlist a read-only in a read-write transaction does 
not corrupt the transaction itself, thus read-write transaction
@@ -308,20 +339,55 @@ public class InternalTableImpl implements InternalTable {
                     false,
                     primaryReplicaAndConsistencyToken,
                     noWriteChecker,
-                    retryOnLockConflict
+                    attemptsObtainLock
             );
         } else {
-            fut = enlistWithRetry(
+            fut = enlistAndInvoke(
                     actualTx,
                     partId,
                     enlistmentConsistencyToken -> fac.apply(actualTx, 
partGroupId, enlistmentConsistencyToken),
                     implicit,
-                    noWriteChecker,
-                    retryOnLockConflict
+                    noWriteChecker
             );
         }
 
-        return postEnlist(fut, false, actualTx, implicit);
+        return postEnlist(fut, false, actualTx, implicit).handle((r, e) -> {
+            if (e != null) {
+                if (implicit) {
+                    long ts = (txStartTs == null) ? 
actualTx.startTimestamp().getPhysical() : txStartTs;
+
+                    if (isRestartTransactionPossible(e) && 
coarseCurrentTimeMillis() - ts < implicitTransactionTimeout) {
+                        return enlistInTx(row, null, fac, noWriteChecker, ts);
+                    }
+                }
+
+                throw wrapReplicationException(e);
+            }
+
+            return completedFuture(r);
+        }).thenCompose(x -> x);
+    }
+
+    /**
+     * Enlists a single row into a transaction.
+     *
+     * @param keyRows Rows.
+     * @param tx The transaction.
+     * @param fac Replica requests factory.
+     * @param reducer Transform reducer.
+     * @param noOpChecker Used to handle no-op operations (producing no 
updates).
+     * @return The future.
+     */
+    private <T> CompletableFuture<T> enlistInTx(
+            Collection<BinaryRowEx> keyRows,
+            @Nullable InternalTransaction tx,
+            IgnitePentaFunction<
+                    Collection<? extends BinaryRow>, InternalTransaction, 
ReplicationGroupId, Long, Boolean, ReplicaRequest
+                    > fac,
+            Function<Collection<RowBatch>, CompletableFuture<T>> reducer,
+            BiPredicate<T, ReplicaRequest> noOpChecker
+    ) {
+        return enlistInTx(keyRows, tx, fac, reducer, noOpChecker, null);
     }
 
     /**
@@ -332,7 +398,7 @@ public class InternalTableImpl implements InternalTable {
      * @param fac Replica requests factory.
      * @param reducer Transform reducer.
      * @param noOpChecker Used to handle no-op operations (producing no 
updates).
-     * @param retryOnLockConflict {@code True} to retry on lock conflict.
+     * @param txStartTs Transaction start time or {@code null}. This parameter 
is used only for retry.
      * @return The future.
      */
     private <T> CompletableFuture<T> enlistInTx(
@@ -343,7 +409,7 @@ public class InternalTableImpl implements InternalTable {
                     > fac,
             Function<Collection<RowBatch>, CompletableFuture<T>> reducer,
             BiPredicate<T, ReplicaRequest> noOpChecker,
-            boolean retryOnLockConflict
+            @Nullable Long txStartTs
     ) {
         // Check whether proposed tx is read-only. Complete future 
exceptionally if true.
         // Attempting to enlist a read-only in a read-write transaction does 
not corrupt the transaction itself, thus read-write transaction
@@ -386,17 +452,16 @@ public class InternalTableImpl implements InternalTable {
                         false,
                         primaryReplicaAndConsistencyToken,
                         noOpChecker,
-                        retryOnLockConflict
+                        attemptsObtainLock
                 );
             } else {
-                fut = enlistWithRetry(
+                fut = enlistAndInvoke(
                         actualTx,
                         partitionId,
                         enlistmentConsistencyToken ->
                                 fac.apply(rowBatch.requestedRows, actualTx, 
partGroupId, enlistmentConsistencyToken, full),
                         full,
-                        noOpChecker,
-                        retryOnLockConflict
+                        noOpChecker
                 );
             }
 
@@ -405,7 +470,21 @@ public class InternalTableImpl implements InternalTable {
 
         CompletableFuture<T> fut = 
reducer.apply(rowBatchByPartitionId.values());
 
-        return postEnlist(fut, implicit && !singlePart, actualTx, full);
+        return postEnlist(fut, implicit && !singlePart, actualTx, 
full).handle((r, e) -> {
+            if (e != null) {
+                if (implicit) {
+                    long ts = (txStartTs == null) ? 
actualTx.startTimestamp().getPhysical() : txStartTs;
+
+                    if (isRestartTransactionPossible(e) && 
coarseCurrentTimeMillis() - ts < implicitTransactionTimeout) {
+                        return enlistInTx(keyRows, null, fac, reducer, 
noOpChecker, ts);
+                    }
+                }
+
+                throw wrapReplicationException(e);
+            }
+
+            return completedFuture(r);
+        }).thenCompose(x -> x);
     }
 
     private InternalTransaction startImplicitRwTxIfNeeded(@Nullable 
InternalTransaction tx) {
@@ -468,7 +547,7 @@ public class InternalTableImpl implements InternalTable {
         if (primaryReplicaAndConsistencyToken != null) {
             fut = replicaSvc.invoke(primaryReplicaAndConsistencyToken.get1(), 
mapFunc.apply(primaryReplicaAndConsistencyToken.get2()));
         } else {
-            fut = enlistWithRetry(tx, partId, mapFunc, false, null, false);
+            fut = enlistAndInvoke(tx, partId, mapFunc, false, null);
         }
 
         return postEnlist(fut, false, tx, false);
@@ -486,39 +565,25 @@ public class InternalTableImpl implements InternalTable {
     }
 
     /**
-     * Partition enlisting with retrying.
+     * Enlists a partition and invokes the replica.
      *
      * @param tx Internal transaction.
      * @param partId Partition number.
      * @param mapFunc Function to create replica request with new enlistment 
consistency token.
      * @param full {@code True} if is a full transaction.
      * @param noWriteChecker Used to handle operations producing no updates.
-     * @param retryOnLockConflict {@code True} to retry on lock conflict.
      * @return The future.
      */
-    private <R> CompletableFuture<R> enlistWithRetry(
+    private <R> CompletableFuture<R> enlistAndInvoke(
             InternalTransaction tx,
             int partId,
             Function<Long, ReplicaRequest> mapFunc,
             boolean full,
-            @Nullable BiPredicate<R, ReplicaRequest> noWriteChecker,
-            boolean retryOnLockConflict
+            @Nullable BiPredicate<R, ReplicaRequest> noWriteChecker
     ) {
-        return (CompletableFuture<R>) enlist(partId, tx)
+        return enlist(partId, tx)
                 .thenCompose(primaryReplicaAndConsistencyToken ->
-                        trackingInvoke(tx, partId, mapFunc, full, 
primaryReplicaAndConsistencyToken, noWriteChecker, retryOnLockConflict))
-                .handle((res0, e) -> {
-                    if (e != null) {
-                        // We can safely retry indefinitely on deadlock 
prevention.
-                        if (retryOnLockConflict && e.getCause() instanceof 
LockException) {
-                            return enlistWithRetry(tx, partId, mapFunc, full, 
noWriteChecker, true);
-                        }
-
-                        return failedFuture(e);
-                    }
-
-                    return completedFuture(res0);
-                }).thenCompose(x -> x);
+                        trackingInvoke(tx, partId, mapFunc, full, 
primaryReplicaAndConsistencyToken, noWriteChecker, attemptsObtainLock));
     }
 
     /**
@@ -540,7 +605,7 @@ public class InternalTableImpl implements InternalTable {
             boolean full,
             IgniteBiTuple<ClusterNode, Long> primaryReplicaAndConsistencyToken,
             @Nullable BiPredicate<R, ReplicaRequest> noWriteChecker,
-            boolean retryOnLockConflict
+            int retryOnLockConflict
     ) {
         assert !tx.isReadOnly() : format("Tracking invoke is available only 
for read-write transactions [tx={}].", tx);
 
@@ -573,14 +638,27 @@ public class InternalTableImpl implements InternalTable {
                 }
 
                 return res;
-            }).exceptionally(e -> {
-                if (retryOnLockConflict && e.getCause() instanceof 
LockException) {
-                    transactionInflights.removeInflight(tx.id()); // Will be 
retried.
+            }).handle((r, e) -> {
+                if (e != null) {
+                    if (retryOnLockConflict > 0 && e.getCause() instanceof 
LockException) {
+                        transactionInflights.removeInflight(tx.id()); // Will 
be retried.
+
+                        return trackingInvoke(
+                                tx,
+                                partId,
+                                ignored -> request,
+                                full,
+                                primaryReplicaAndConsistencyToken,
+                                noWriteChecker,
+                                retryOnLockConflict - 1
+                        );
+                    }
+
+                    ExceptionUtils.sneakyThrow(e);
                 }
 
-                ExceptionUtils.sneakyThrow(e);
-                return null; // Unreachable.
-            });
+                return completedFuture(r);
+            }).thenCompose(x -> x);
         } else {
             return replicaSvc.invoke(primaryReplicaAndConsistencyToken.get1(), 
request);
         }
@@ -798,8 +876,7 @@ public class InternalTableImpl implements InternalTable {
                         .full(tx == null)
                         .coordinatorId(txo.coordinatorId())
                         .build(),
-                (res, req) -> false,
-                false
+                (res, req) -> false
         );
     }
 
@@ -877,8 +954,7 @@ public class InternalTableImpl implements InternalTable {
                 (keyRows0, txo, groupId, enlistmentConsistencyToken, full) ->
                         readWriteMultiRowPkReplicaRequest(RW_GET_ALL, 
keyRows0, txo, groupId, enlistmentConsistencyToken, full),
                 InternalTableImpl::collectMultiRowsResponsesWithRestoreOrder,
-                (res, req) -> false,
-                false
+                (res, req) -> false
         );
     }
 
@@ -997,8 +1073,7 @@ public class InternalTableImpl implements InternalTable {
                         .full(tx == null)
                         .coordinatorId(txo.coordinatorId())
                         .build(),
-                (res, req) -> false,
-                false
+                (res, req) -> false
         );
     }
 
@@ -1010,8 +1085,7 @@ public class InternalTableImpl implements InternalTable {
                 tx,
                 this::upsertAllInternal,
                 RowBatch::allResultFutures,
-                (res, req) -> false,
-                false
+                (res, req) -> false
         );
     }
 
@@ -1021,13 +1095,12 @@ public class InternalTableImpl implements InternalTable 
{
         InternalTransaction tx = txManager.begin(observableTimestampTracker);
         TablePartitionId partGroupId = new TablePartitionId(tableId, 
partition);
 
-        CompletableFuture<Void> fut = enlistWithRetry(
+        CompletableFuture<Void> fut = enlistAndInvoke(
                 tx,
                 partition,
                 enlistmentConsistencyToken -> upsertAllInternal(rows, deleted, 
tx, partGroupId, enlistmentConsistencyToken, true),
                 true,
-                null,
-                true // Allow auto retries for data streamer.
+                null
         );
 
         return postEnlist(fut, false, tx, true); // Will be committed in one 
RTT.
@@ -1051,8 +1124,7 @@ public class InternalTableImpl implements InternalTable {
                         .full(tx == null)
                         .coordinatorId(txo.coordinatorId())
                         .build(),
-                (res, req) -> false,
-                false
+                (res, req) -> false
         );
     }
 
@@ -1074,8 +1146,7 @@ public class InternalTableImpl implements InternalTable {
                         .full(tx == null)
                         .coordinatorId(txo.coordinatorId())
                         .build(),
-                (res, req) -> !res,
-                false
+                (res, req) -> !res
         );
     }
 
@@ -1104,8 +1175,7 @@ public class InternalTableImpl implements InternalTable {
 
                     // All values are null, this means nothing was deleted.
                     return true;
-                },
-                false
+                }
         );
     }
 
@@ -1153,8 +1223,7 @@ public class InternalTableImpl implements InternalTable {
                         .full(tx == null)
                         .coordinatorId(txo.coordinatorId())
                         .build(),
-                (res, req) -> !res,
-                false
+                (res, req) -> !res
         );
     }
 
@@ -1180,8 +1249,7 @@ public class InternalTableImpl implements InternalTable {
                         .full(tx == null)
                         .coordinatorId(txo.coordinatorId())
                         .build(),
-                (res, req) -> !res,
-                false
+                (res, req) -> !res
         );
     }
 
@@ -1203,8 +1271,7 @@ public class InternalTableImpl implements InternalTable {
                         .full(tx == null)
                         .coordinatorId(txo.coordinatorId())
                         .build(),
-                (res, req) -> res == null,
-                false
+                (res, req) -> res == null
         );
     }
 
@@ -1226,8 +1293,7 @@ public class InternalTableImpl implements InternalTable {
                         .full(tx == null)
                         .coordinatorId(txo.coordinatorId())
                         .build(),
-                (res, req) -> !res,
-                false
+                (res, req) -> !res
         );
     }
 
@@ -1249,8 +1315,7 @@ public class InternalTableImpl implements InternalTable {
                         .full(tx == null)
                         .coordinatorId(txo.coordinatorId())
                         .build(),
-                (res, req) -> !res,
-                false
+                (res, req) -> !res
         );
     }
 
@@ -1272,8 +1337,7 @@ public class InternalTableImpl implements InternalTable {
                         .full(tx == null)
                         .coordinatorId(txo.coordinatorId())
                         .build(),
-                (res, req) -> res == null,
-                false
+                (res, req) -> res == null
         );
     }
 
@@ -1295,8 +1359,7 @@ public class InternalTableImpl implements InternalTable {
 
                     // All values are null, this means nothing was deleted.
                     return true;
-                },
-                false
+                }
         );
     }
 
@@ -1329,8 +1392,7 @@ public class InternalTableImpl implements InternalTable {
 
                     // All values are null, this means nothing was deleted.
                     return true;
-                },
-                false
+                }
         );
     }
 
@@ -2162,4 +2224,26 @@ public class InternalTableImpl implements InternalTable {
         return readWriteMultiRowReplicaRequest(
                 RequestType.RW_UPSERT_ALL, keyRows0, deleted, txo, groupId, 
enlistmentConsistencyToken, full);
     }
+
+    /**
+     * Ensure that the exception allows you to restart a transaction.
+     *
+     * @param e Exception to check.
+     * @return True if retrying is possible, false otherwise.
+     */
+    private boolean isRestartTransactionPossible(Throwable e) {
+        if (e instanceof LockException) {
+            return true;
+        } else if (e instanceof TransactionException && e.getCause() 
instanceof LockException) {
+            return true;
+        } else if (e instanceof CompletionException && e.getCause() instanceof 
LockException) {
+            return true;
+        } else if (e instanceof CompletionException
+                && e.getCause() instanceof TransactionException
+                && e.getCause().getCause() instanceof LockException) {
+            return true;
+        }
+
+        return false;
+    }
 }
diff --git 
a/modules/table/src/test/java/org/apache/ignite/internal/table/distributed/TableManagerRecoveryTest.java
 
b/modules/table/src/test/java/org/apache/ignite/internal/table/distributed/TableManagerRecoveryTest.java
index 2cc7e95082..3d9c5fcea5 100644
--- 
a/modules/table/src/test/java/org/apache/ignite/internal/table/distributed/TableManagerRecoveryTest.java
+++ 
b/modules/table/src/test/java/org/apache/ignite/internal/table/distributed/TableManagerRecoveryTest.java
@@ -110,6 +110,7 @@ import 
org.apache.ignite.internal.thread.IgniteThreadFactory;
 import org.apache.ignite.internal.thread.StripedThreadPoolExecutor;
 import org.apache.ignite.internal.tx.HybridTimestampTracker;
 import org.apache.ignite.internal.tx.TxManager;
+import org.apache.ignite.internal.tx.configuration.TransactionConfiguration;
 import org.apache.ignite.internal.tx.impl.RemotelyTriggeredResourceRegistry;
 import org.apache.ignite.internal.tx.impl.TransactionInflights;
 import org.apache.ignite.internal.tx.storage.state.TxStateTableStorage;
@@ -149,6 +150,8 @@ public class TableManagerRecoveryTest extends 
IgniteAbstractTest {
     @InjectConfiguration
     private GcConfiguration gcConfig;
     @InjectConfiguration
+    private TransactionConfiguration txConfig;
+    @InjectConfiguration
     private StorageUpdateConfiguration storageUpdateConfiguration;
     @WorkDirectory
     private Path workDir;
@@ -299,6 +302,7 @@ public class TableManagerRecoveryTest extends 
IgniteAbstractTest {
                 NODE_NAME,
                 revisionUpdater,
                 gcConfig,
+                txConfig,
                 storageUpdateConfiguration,
                 clusterService.messagingService(),
                 clusterService.topologyService(),
diff --git 
a/modules/table/src/test/java/org/apache/ignite/internal/table/distributed/TableManagerTest.java
 
b/modules/table/src/test/java/org/apache/ignite/internal/table/distributed/TableManagerTest.java
index 00a4ce4480..5445984c3d 100644
--- 
a/modules/table/src/test/java/org/apache/ignite/internal/table/distributed/TableManagerTest.java
+++ 
b/modules/table/src/test/java/org/apache/ignite/internal/table/distributed/TableManagerTest.java
@@ -126,6 +126,7 @@ import 
org.apache.ignite.internal.testframework.IgniteAbstractTest;
 import org.apache.ignite.internal.thread.IgniteThreadFactory;
 import org.apache.ignite.internal.tx.HybridTimestampTracker;
 import org.apache.ignite.internal.tx.TxManager;
+import org.apache.ignite.internal.tx.configuration.TransactionConfiguration;
 import org.apache.ignite.internal.tx.impl.RemotelyTriggeredResourceRegistry;
 import org.apache.ignite.internal.tx.impl.TransactionInflights;
 import org.apache.ignite.internal.tx.storage.state.TxStateStorage;
@@ -208,6 +209,9 @@ public class TableManagerTest extends IgniteAbstractTest {
     @InjectConfiguration
     private GcConfiguration gcConfig;
 
+    @InjectConfiguration
+    private TransactionConfiguration txConfig;
+
     /** Storage update configuration. */
     @InjectConfiguration
     private StorageUpdateConfiguration storageUpdateConfiguration;
@@ -753,6 +757,7 @@ public class TableManagerTest extends IgniteAbstractTest {
                 NODE_NAME,
                 revisionUpdater,
                 gcConfig,
+                txConfig,
                 storageUpdateConfiguration,
                 clusterService.messagingService(),
                 clusterService.topologyService(),
diff --git 
a/modules/table/src/test/java/org/apache/ignite/internal/table/distributed/storage/InternalTableImplTest.java
 
b/modules/table/src/test/java/org/apache/ignite/internal/table/distributed/storage/InternalTableImplTest.java
index b8871d1007..0919a7c71a 100644
--- 
a/modules/table/src/test/java/org/apache/ignite/internal/table/distributed/storage/InternalTableImplTest.java
+++ 
b/modules/table/src/test/java/org/apache/ignite/internal/table/distributed/storage/InternalTableImplTest.java
@@ -72,7 +72,9 @@ public class InternalTableImplTest extends 
BaseIgniteAbstractTest {
                 new HybridTimestampTracker(),
                 mock(PlacementDriver.class),
                 new TableRaftServiceImpl("test", 1, Int2ObjectMaps.emptyMap(), 
new SingleClusterNodeResolver(mock(ClusterNode.class))),
-                mock(TransactionInflights.class)
+                mock(TransactionInflights.class),
+                3_000,
+                0
         );
 
         // Let's check the empty table.
@@ -119,7 +121,9 @@ public class InternalTableImplTest extends 
BaseIgniteAbstractTest {
                 new HybridTimestampTracker(),
                 mock(PlacementDriver.class),
                 new TableRaftServiceImpl("test", 3, Int2ObjectMaps.emptyMap(), 
new SingleClusterNodeResolver(mock(ClusterNode.class))),
-                mock(TransactionInflights.class)
+                mock(TransactionInflights.class),
+                3_000,
+                0
         );
 
         List<BinaryRowEx> originalRows = List.of(
diff --git 
a/modules/table/src/testFixtures/java/org/apache/ignite/distributed/ItTxTestCluster.java
 
b/modules/table/src/testFixtures/java/org/apache/ignite/distributed/ItTxTestCluster.java
index ee9fa70414..f9af509904 100644
--- 
a/modules/table/src/testFixtures/java/org/apache/ignite/distributed/ItTxTestCluster.java
+++ 
b/modules/table/src/testFixtures/java/org/apache/ignite/distributed/ItTxTestCluster.java
@@ -718,7 +718,9 @@ public class ItTxTestCluster {
                         timestampTracker,
                         placementDriver,
                         new TableRaftServiceImpl(tableName, 1, clients, 
nodeResolver),
-                        clientTransactionInflights
+                        clientTransactionInflights,
+                        500,
+                        0
                 ),
                 new DummySchemaManagerImpl(schemaDescriptor),
                 clientTxManager.lockManager(),
diff --git 
a/modules/table/src/testFixtures/java/org/apache/ignite/internal/table/impl/DummyInternalTableImpl.java
 
b/modules/table/src/testFixtures/java/org/apache/ignite/internal/table/impl/DummyInternalTableImpl.java
index c9bba46a80..c998f5314d 100644
--- 
a/modules/table/src/testFixtures/java/org/apache/ignite/internal/table/impl/DummyInternalTableImpl.java
+++ 
b/modules/table/src/testFixtures/java/org/apache/ignite/internal/table/impl/DummyInternalTableImpl.java
@@ -246,7 +246,9 @@ public class DummyInternalTableImpl extends 
InternalTableImpl {
                         Int2ObjectMaps.singleton(PART_ID, 
mock(RaftGroupService.class)),
                         new SingleClusterNodeResolver(LOCAL_NODE)
                 ),
-                transactionInflights
+                transactionInflights,
+                3_000,
+                0
         );
 
         RaftGroupService svc = 
tableRaftService().partitionRaftGroupService(PART_ID);
diff --git 
a/modules/transactions/src/main/java/org/apache/ignite/internal/tx/configuration/TransactionConfigurationSchema.java
 
b/modules/transactions/src/main/java/org/apache/ignite/internal/tx/configuration/TransactionConfigurationSchema.java
index 57ee2c977b..84e447cb43 100644
--- 
a/modules/transactions/src/main/java/org/apache/ignite/internal/tx/configuration/TransactionConfigurationSchema.java
+++ 
b/modules/transactions/src/main/java/org/apache/ignite/internal/tx/configuration/TransactionConfigurationSchema.java
@@ -34,4 +34,14 @@ public class TransactionConfigurationSchema {
     @Range(min = 0)
     @Value(hasDefault = true)
     public final long abandonedCheckTs = DEFAULT_ABANDONED_CHECK_TS;
+
+    /** Timeout for implicit transactions. */
+    @Range(min = 0)
+    @Value(hasDefault = true)
+    public final long implicitTransactionTimeout = 3_000;
+
+    /** A transaction tries to take lock several times until it throws an 
exception {@lonk org.apache.ignite.tx.TransactionException}. */
+    @Range(min = 0)
+    @Value(hasDefault = true)
+    public final int attemptsObtainLock = 3;
 }

Reply via email to