This is an automated email from the ASF dual-hosted git repository.
sanpwc pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/ignite-3.git
The following commit(s) were added to refs/heads/main by this push:
new 20359d3a12 IGNITE-21540 Handle lock exception for transaction
operations (#3462)
20359d3a12 is described below
commit 20359d3a12dc686ab03959b814eb4458d25633c6
Author: Vladislav Pyatkov <[email protected]>
AuthorDate: Fri Mar 22 13:24:42 2024 +0300
IGNITE-21540 Handle lock exception for transaction operations (#3462)
---
.../runner/app/ItIgniteNodeRestartTest.java | 2 +
.../org/apache/ignite/internal/app/IgniteImpl.java | 1 +
.../exec/rel/TableScanNodeExecutionTest.java | 4 +-
.../ItTxAbstractDistributedTestSingleNode.java | 44 ++++
.../rebalance/ItRebalanceDistributedTest.java | 2 +
.../ignite/internal/table/ItColocationTest.java | 4 +-
.../internal/table/ItOperationRetryTest.java | 132 ++++++++++++
.../internal/table/ItTransactionRecoveryTest.java | 7 +-
.../internal/table/distributed/TableManager.java | 16 +-
.../replicator/PartitionReplicaListener.java | 93 ++++++--
.../distributed/storage/InternalTableImpl.java | 234 ++++++++++++++-------
.../distributed/TableManagerRecoveryTest.java | 4 +
.../table/distributed/TableManagerTest.java | 5 +
.../distributed/storage/InternalTableImplTest.java | 8 +-
.../apache/ignite/distributed/ItTxTestCluster.java | 4 +-
.../table/impl/DummyInternalTableImpl.java | 4 +-
.../TransactionConfigurationSchema.java | 10 +
17 files changed, 475 insertions(+), 99 deletions(-)
diff --git
a/modules/runner/src/integrationTest/java/org/apache/ignite/internal/runner/app/ItIgniteNodeRestartTest.java
b/modules/runner/src/integrationTest/java/org/apache/ignite/internal/runner/app/ItIgniteNodeRestartTest.java
index a536f060d4..5e8d364097 100644
---
a/modules/runner/src/integrationTest/java/org/apache/ignite/internal/runner/app/ItIgniteNodeRestartTest.java
+++
b/modules/runner/src/integrationTest/java/org/apache/ignite/internal/runner/app/ItIgniteNodeRestartTest.java
@@ -503,6 +503,7 @@ public class ItIgniteNodeRestartTest extends
BaseIgniteRestartTest {
);
GcConfiguration gcConfig =
clusterConfigRegistry.getConfiguration(GcConfiguration.KEY);
+ TransactionConfiguration txConfiguration =
clusterConfigRegistry.getConfiguration(TransactionConfiguration.KEY);
var clockWaiter = new ClockWaiter(name, hybridClock);
@@ -551,6 +552,7 @@ public class ItIgniteNodeRestartTest extends
BaseIgniteRestartTest {
name,
registry,
gcConfig,
+ txConfiguration,
storageUpdateConfiguration,
messagingServiceReturningToStorageOperationsPool,
clusterSvc.topologyService(),
diff --git
a/modules/runner/src/main/java/org/apache/ignite/internal/app/IgniteImpl.java
b/modules/runner/src/main/java/org/apache/ignite/internal/app/IgniteImpl.java
index 8ea348e858..a91ff35fb0 100644
---
a/modules/runner/src/main/java/org/apache/ignite/internal/app/IgniteImpl.java
+++
b/modules/runner/src/main/java/org/apache/ignite/internal/app/IgniteImpl.java
@@ -711,6 +711,7 @@ public class IgniteImpl implements Ignite {
name,
registry,
gcConfig,
+ txConfig,
storageUpdateConfiguration,
messagingServiceReturningToStorageOperationsPool,
clusterSvc.topologyService(),
diff --git
a/modules/sql-engine/src/test/java/org/apache/ignite/internal/sql/engine/exec/rel/TableScanNodeExecutionTest.java
b/modules/sql-engine/src/test/java/org/apache/ignite/internal/sql/engine/exec/rel/TableScanNodeExecutionTest.java
index 6aa22be45f..04ef25807c 100644
---
a/modules/sql-engine/src/test/java/org/apache/ignite/internal/sql/engine/exec/rel/TableScanNodeExecutionTest.java
+++
b/modules/sql-engine/src/test/java/org/apache/ignite/internal/sql/engine/exec/rel/TableScanNodeExecutionTest.java
@@ -253,7 +253,9 @@ public class TableScanNodeExecutionTest extends
AbstractExecutionTest<Object[]>
Int2ObjectMaps.singleton(0,
mock(RaftGroupService.class)),
new
SingleClusterNodeResolver(mock(ClusterNode.class))
),
- mock(TransactionInflights.class)
+ mock(TransactionInflights.class),
+ 3_000,
+ 0
);
this.dataAmount = dataAmount;
diff --git
a/modules/table/src/integrationTest/java/org/apache/ignite/distributed/ItTxAbstractDistributedTestSingleNode.java
b/modules/table/src/integrationTest/java/org/apache/ignite/distributed/ItTxAbstractDistributedTestSingleNode.java
index b9dce5b0cc..e3688ca88e 100644
---
a/modules/table/src/integrationTest/java/org/apache/ignite/distributed/ItTxAbstractDistributedTestSingleNode.java
+++
b/modules/table/src/integrationTest/java/org/apache/ignite/distributed/ItTxAbstractDistributedTestSingleNode.java
@@ -18,12 +18,19 @@
package org.apache.ignite.distributed;
import static
org.apache.ignite.internal.testframework.IgniteTestUtils.assertThrowsWithCode;
+import static
org.apache.ignite.internal.testframework.IgniteTestUtils.runAsync;
import static
org.apache.ignite.internal.testframework.IgniteTestUtils.runMultiThreadedAsync;
+import static
org.apache.ignite.internal.testframework.matchers.CompletableFutureExceptionMatcher.willThrow;
+import static
org.apache.ignite.internal.testframework.matchers.CompletableFutureMatcher.willCompleteSuccessfully;
import static
org.apache.ignite.internal.testframework.matchers.CompletableFutureMatcher.willSucceedFast;
import static
org.apache.ignite.lang.ErrorGroups.Transactions.TX_ALREADY_FINISHED_ERR;
import static org.hamcrest.MatcherAssert.assertThat;
+import static org.junit.jupiter.api.Assertions.assertFalse;
+import static org.junit.jupiter.api.Assertions.assertNotNull;
+import static org.junit.jupiter.api.Assertions.assertNull;
import static org.junit.jupiter.api.Assertions.assertTrue;
+import java.util.concurrent.CompletableFuture;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.CyclicBarrier;
import java.util.concurrent.ThreadLocalRandom;
@@ -34,6 +41,7 @@ import org.apache.ignite.tx.Transaction;
import org.apache.ignite.tx.TransactionException;
import org.apache.ignite.tx.TransactionOptions;
import org.junit.jupiter.api.RepeatedTest;
+import org.junit.jupiter.api.Test;
import org.junit.jupiter.api.TestInfo;
/**
@@ -166,6 +174,42 @@ public abstract class
ItTxAbstractDistributedTestSingleNode extends TxAbstractTe
assertThat(futFinishes, willSucceedFast());
}
+ @Test
+ public void testImplicitTransactionRetry() {
+ var rv = accounts.recordView();
+
+ Transaction tx = igniteTransactions.begin();
+
+ assertNull(rv.get(tx, makeKey(1)));
+
+ CompletableFuture<Void> implicitOpFut = runAsync(() -> rv.upsert(null,
makeValue(1, 1.)));
+
+ assertFalse(implicitOpFut.isDone());
+
+ tx.commit();
+
+ assertThat(implicitOpFut, willCompleteSuccessfully());
+
+ assertNotNull(rv.get(null, makeKey(1)));
+ }
+
+ @Test
+ public void testImplicitTransactionTimeout() {
+ var rv = accounts.recordView();
+
+ Transaction tx = igniteTransactions.begin();
+
+ assertNull(rv.get(tx, makeKey(1)));
+
+ CompletableFuture<Void> implicitOpFut = runAsync(() -> rv.upsert(null,
makeValue(1, 1.)));
+
+ assertFalse(implicitOpFut.isDone());
+
+ assertThat(implicitOpFut, willThrow(TransactionException.class));
+
+ assertNull(rv.get(null, makeKey(1)));
+ }
+
/**
* Finish the tx.
*
diff --git
a/modules/table/src/integrationTest/java/org/apache/ignite/internal/rebalance/ItRebalanceDistributedTest.java
b/modules/table/src/integrationTest/java/org/apache/ignite/internal/rebalance/ItRebalanceDistributedTest.java
index c1e7d69085..b9417b5f18 100644
---
a/modules/table/src/integrationTest/java/org/apache/ignite/internal/rebalance/ItRebalanceDistributedTest.java
+++
b/modules/table/src/integrationTest/java/org/apache/ignite/internal/rebalance/ItRebalanceDistributedTest.java
@@ -1135,6 +1135,7 @@ public class ItRebalanceDistributedTest extends
BaseIgniteAbstractTest {
metaStorageManager.registerRevisionUpdateListener(function::apply);
GcConfiguration gcConfig =
clusterConfigRegistry.getConfiguration(GcConfiguration.KEY);
+ TransactionConfiguration txConfig =
clusterConfigRegistry.getConfiguration(TransactionConfiguration.KEY);
DataStorageModules dataStorageModules = new
DataStorageModules(List.of(
new PersistentPageMemoryDataStorageModule(),
@@ -1193,6 +1194,7 @@ public class ItRebalanceDistributedTest extends
BaseIgniteAbstractTest {
name,
registry,
gcConfig,
+ txConfig,
storageUpdateConfiguration,
clusterService.messagingService(),
clusterService.topologyService(),
diff --git
a/modules/table/src/integrationTest/java/org/apache/ignite/internal/table/ItColocationTest.java
b/modules/table/src/integrationTest/java/org/apache/ignite/internal/table/ItColocationTest.java
index d599540c39..4ab770b890 100644
---
a/modules/table/src/integrationTest/java/org/apache/ignite/internal/table/ItColocationTest.java
+++
b/modules/table/src/integrationTest/java/org/apache/ignite/internal/table/ItColocationTest.java
@@ -301,7 +301,9 @@ public class ItColocationTest extends
BaseIgniteAbstractTest {
observableTimestampTracker,
new TestPlacementDriver(clusterNode),
new TableRaftServiceImpl("PUBLIC.TEST", PARTS, partRafts, new
SingleClusterNodeResolver(clusterNode)),
- transactionInflights
+ transactionInflights,
+ 3_000,
+ 0
);
}
diff --git
a/modules/table/src/integrationTest/java/org/apache/ignite/internal/table/ItOperationRetryTest.java
b/modules/table/src/integrationTest/java/org/apache/ignite/internal/table/ItOperationRetryTest.java
new file mode 100644
index 0000000000..f329a5767e
--- /dev/null
+++
b/modules/table/src/integrationTest/java/org/apache/ignite/internal/table/ItOperationRetryTest.java
@@ -0,0 +1,132 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.table;
+
+import static java.util.concurrent.TimeUnit.SECONDS;
+import static org.apache.ignite.internal.SessionUtils.executeUpdate;
+import static
org.apache.ignite.internal.testframework.matchers.CompletableFutureMatcher.willCompleteSuccessfully;
+import static org.hamcrest.MatcherAssert.assertThat;
+import static org.junit.jupiter.api.Assertions.assertEquals;
+
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.function.Predicate;
+import java.util.stream.IntStream;
+import org.apache.ignite.internal.ClusterPerTestIntegrationTest;
+import org.apache.ignite.internal.app.IgniteImpl;
+import org.apache.ignite.internal.network.DefaultMessagingService;
+import org.apache.ignite.internal.placementdriver.ReplicaMeta;
+import org.apache.ignite.internal.replicator.ReplicationGroupId;
+import org.apache.ignite.internal.replicator.TablePartitionId;
+import org.apache.ignite.internal.replicator.message.ReplicaResponse;
+import org.apache.ignite.table.RecordView;
+import org.apache.ignite.table.Tuple;
+import org.apache.ignite.tx.Transaction;
+import org.junit.jupiter.api.BeforeEach;
+import org.junit.jupiter.api.Test;
+import org.junit.jupiter.api.TestInfo;
+
+/**
+ * Tests of transaction operation retry.
+ */
+public class ItOperationRetryTest extends ClusterPerTestIntegrationTest {
+ /** Table name. */
+ private static final String TABLE_NAME = "test_table";
+
+ private static final int PART_ID = 0;
+
+ @BeforeEach
+ @Override
+ public void setup(TestInfo testInfo) throws Exception {
+ super.setup(testInfo);
+
+ String zoneSql = "create zone test_zone with partitions=1, replicas=3";
+ String sql = "create table " + TABLE_NAME + " (key int primary key,
val varchar(20)) with primary_zone='TEST_ZONE'";
+
+ cluster.doInSession(0, session -> {
+ executeUpdate(zoneSql, session);
+ executeUpdate(sql, session);
+ });
+ }
+
+ @Test
+ public void testLockExceptionRetry() {
+ TableImpl tbl = (TableImpl) node(0).tables().table(TABLE_NAME);
+
+ var tblReplicationGrp = new TablePartitionId(tbl.tableId(), PART_ID);
+
+ String leaseholder = waitAndGetPrimaryReplica(node(0),
tblReplicationGrp).getLeaseholder();
+
+ IgniteImpl leaseholderNode = findNodeByName(leaseholder);
+ IgniteImpl otherNode = findNode(0, initialNodes(), ignite ->
!leaseholderNode.equals(ignite.name()));
+
+ log.info("Transactions are executed from a non-primary node [node={},
primary={}].", otherNode.name(), leaseholderNode.name());
+
+ RecordView<Tuple> view =
otherNode.tables().table(TABLE_NAME).recordView();
+
+ Transaction tx1 = otherNode.transactions().begin();
+ Transaction tx2 = otherNode.transactions().begin();
+
+ view.get(tx1, Tuple.create().set("key", 1));
+
+ DefaultMessagingService messagingService = (DefaultMessagingService)
leaseholderNode.clusterService().messagingService();
+
+ AtomicBoolean lockHold = new AtomicBoolean(true);
+
+ messagingService.dropMessages((nodeName, networkMessage) -> {
+ if (nodeName.equals(otherNode.name()) && networkMessage instanceof
ReplicaResponse && lockHold.compareAndSet(true, false)) {
+ log.info("Exceptional response on lock acquisition
[resp={}].", networkMessage.getClass().getSimpleName());
+
+ tx1.commit();
+ }
+
+ return false;
+ });
+
+ view.upsert(tx2, Tuple.create().set("key", 1).set("val", "new value"));
+
+ tx2.commit();
+
+ assertEquals("new value", view.get(null, Tuple.create().set("key",
1)).value("val"));
+ }
+
+ private IgniteImpl findNodeByName(String leaseholder) {
+ return findNode(0, initialNodes(), n -> leaseholder.equals(n.name()));
+ }
+
+ private IgniteImpl findNode(int startRange, int endRange,
Predicate<IgniteImpl> filter) {
+ return IntStream.range(startRange, endRange)
+ .mapToObj(this::node)
+ .filter(filter::test)
+ .findFirst()
+ .get();
+ }
+
+ private ReplicaMeta waitAndGetPrimaryReplica(IgniteImpl node,
ReplicationGroupId tblReplicationGrp) {
+ CompletableFuture<ReplicaMeta> primaryReplicaFut =
node.placementDriver().awaitPrimaryReplica(
+ tblReplicationGrp,
+ node.clock().now(),
+ 10,
+ SECONDS
+ );
+
+ assertThat(primaryReplicaFut, willCompleteSuccessfully());
+
+ return primaryReplicaFut.join();
+ }
+}
diff --git
a/modules/table/src/integrationTest/java/org/apache/ignite/internal/table/ItTransactionRecoveryTest.java
b/modules/table/src/integrationTest/java/org/apache/ignite/internal/table/ItTransactionRecoveryTest.java
index e0c2877c48..16f057613e 100644
---
a/modules/table/src/integrationTest/java/org/apache/ignite/internal/table/ItTransactionRecoveryTest.java
+++
b/modules/table/src/integrationTest/java/org/apache/ignite/internal/table/ItTransactionRecoveryTest.java
@@ -128,6 +128,7 @@ public class ItTransactionRecoveryTest extends
ClusterPerTestIntegrationTest {
builder.clusterConfiguration("{\n"
+ " \"transaction\": {\n"
+ " \"abandonedCheckTs\": 600000\n"
+ + " \"attemptsObtainLock\": 0\n"
+ " }\n"
+ "}\n");
}
@@ -745,7 +746,7 @@ public class ItTransactionRecoveryTest extends
ClusterPerTestIntegrationTest {
public void testPrimaryFailureWhileInflightInProgressAfterFirstResponse()
throws Exception {
TableImpl tbl = (TableImpl) node(0).tables().table(TABLE_NAME);
- var tblReplicationGrp = new TablePartitionId(tbl.tableId(), 0);
+ var tblReplicationGrp = new TablePartitionId(tbl.tableId(), PART_ID);
String leaseholder = waitAndGetLeaseholder(node(0), tblReplicationGrp);
@@ -961,9 +962,7 @@ public class ItTransactionRecoveryTest extends
ClusterPerTestIntegrationTest {
public void testCursorsClosedAfterTxClose() throws Exception {
TableImpl tbl = (TableImpl) node(0).tables().table(TABLE_NAME);
- int partId = 0;
-
- var tblReplicationGrp = new TablePartitionId(tbl.tableId(), partId);
+ var tblReplicationGrp = new TablePartitionId(tbl.tableId(), PART_ID);
String leaseholder = waitAndGetPrimaryReplica(node(0),
tblReplicationGrp).getLeaseholder();
diff --git
a/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/TableManager.java
b/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/TableManager.java
index 0785fa5ce1..26e39c20da 100644
---
a/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/TableManager.java
+++
b/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/TableManager.java
@@ -198,6 +198,7 @@ import org.apache.ignite.internal.tostring.S;
import org.apache.ignite.internal.tx.HybridTimestampTracker;
import org.apache.ignite.internal.tx.LockManager;
import org.apache.ignite.internal.tx.TxManager;
+import org.apache.ignite.internal.tx.configuration.TransactionConfiguration;
import org.apache.ignite.internal.tx.impl.RemotelyTriggeredResourceRegistry;
import org.apache.ignite.internal.tx.impl.TransactionInflights;
import org.apache.ignite.internal.tx.impl.TxMessageSender;
@@ -402,12 +403,18 @@ public class TableManager implements
IgniteTablesInternal, IgniteComponent {
private final TransactionInflights transactionInflights;
+ private final TransactionConfiguration txCfg;
+
+ private long implicitTransactionTimeout;
+ private int attemptsObtainLock;
+
/**
* Creates a new table manager.
*
* @param nodeName Node name.
* @param registry Registry for versioned values.
* @param gcConfig Garbage collector configuration.
+ * @param txCfg Transaction configuration.
* @param storageUpdateConfig Storage update handler configuration.
* @param raftMgr Raft manager.
* @param replicaMgr Replica manager.
@@ -435,6 +442,7 @@ public class TableManager implements IgniteTablesInternal,
IgniteComponent {
String nodeName,
Consumer<LongFunction<CompletableFuture<?>>> registry,
GcConfiguration gcConfig,
+ TransactionConfiguration txCfg,
StorageUpdateConfiguration storageUpdateConfig,
MessagingService messagingService,
TopologyService topologyService,
@@ -491,6 +499,7 @@ public class TableManager implements IgniteTablesInternal,
IgniteComponent {
this.lowWatermark = lowWatermark;
this.asyncContinuationExecutor = asyncContinuationExecutor;
this.transactionInflights = transactionInflights;
+ this.txCfg = txCfg;
this.executorInclinedSchemaSyncService = new
ExecutorInclinedSchemaSyncService(schemaSyncService,
partitionOperationsExecutor);
this.executorInclinedPlacementDriver = new
ExecutorInclinedPlacementDriver(placementDriver, partitionOperationsExecutor);
@@ -603,6 +612,9 @@ public class TableManager implements IgniteTablesInternal,
IgniteComponent {
partitionReplicatorNodeRecovery.start();
+ implicitTransactionTimeout =
txCfg.implicitTransactionTimeout().value();
+ attemptsObtainLock = txCfg.attemptsObtainLock().value();
+
return nullCompletedFuture();
});
}
@@ -1299,7 +1311,9 @@ public class TableManager implements
IgniteTablesInternal, IgniteComponent {
observableTimestampTracker,
executorInclinedPlacementDriver,
tableRaftService,
- transactionInflights
+ transactionInflights,
+ implicitTransactionTimeout,
+ attemptsObtainLock
);
var table = new TableImpl(
diff --git
a/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/replicator/PartitionReplicaListener.java
b/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/replicator/PartitionReplicaListener.java
index ab1610be37..bb0ff8d86e 100644
---
a/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/replicator/PartitionReplicaListener.java
+++
b/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/replicator/PartitionReplicaListener.java
@@ -433,7 +433,7 @@ public class PartitionReplicaListener implements
ReplicaListener {
return validateTableExistence(request, opTsIfDirectRo)
.thenCompose(unused -> validateSchemaMatch(request,
opTsIfDirectRo))
.thenCompose(unused -> waitForSchemasBeforeReading(request,
opTsIfDirectRo))
- .thenCompose(unused ->
processOperationRequestWithTxRwCounter(request, isPrimary, opTsIfDirectRo));
+ .thenCompose(unused ->
processOperationRequestWithTxRwCounter(senderId, request, isPrimary,
opTsIfDirectRo));
}
/**
@@ -603,6 +603,7 @@ public class PartitionReplicaListener implements
ReplicaListener {
}
private CompletableFuture<?> processOperationRequest(
+ String senderId,
ReplicaRequest request,
@Nullable Boolean isPrimary,
@Nullable HybridTimestamp opStartTsIfDirectRo
@@ -610,23 +611,33 @@ public class PartitionReplicaListener implements
ReplicaListener {
if (request instanceof ReadWriteSingleRowReplicaRequest) {
var req = (ReadWriteSingleRowReplicaRequest) request;
- return appendTxCommand(req.transactionId(), req.requestType(),
req.full(), () -> processSingleEntryAction(req));
+ var opId = new OperationId(senderId, req.timestampLong());
+
+ return appendTxCommand(req.transactionId(), opId,
req.requestType(), req.full(), () -> processSingleEntryAction(req));
} else if (request instanceof ReadWriteSingleRowPkReplicaRequest) {
var req = (ReadWriteSingleRowPkReplicaRequest) request;
- return appendTxCommand(req.transactionId(), req.requestType(),
req.full(), () -> processSingleEntryAction(req));
+ var opId = new OperationId(senderId, req.timestampLong());
+
+ return appendTxCommand(req.transactionId(), opId,
req.requestType(), req.full(), () -> processSingleEntryAction(req));
} else if (request instanceof ReadWriteMultiRowReplicaRequest) {
var req = (ReadWriteMultiRowReplicaRequest) request;
- return appendTxCommand(req.transactionId(), req.requestType(),
req.full(), () -> processMultiEntryAction(req));
+ var opId = new OperationId(senderId, req.timestampLong());
+
+ return appendTxCommand(req.transactionId(), opId,
req.requestType(), req.full(), () -> processMultiEntryAction(req));
} else if (request instanceof ReadWriteMultiRowPkReplicaRequest) {
var req = (ReadWriteMultiRowPkReplicaRequest) request;
- return appendTxCommand(req.transactionId(), req.requestType(),
req.full(), () -> processMultiEntryAction(req));
+ var opId = new OperationId(senderId, req.timestampLong());
+
+ return appendTxCommand(req.transactionId(), opId,
req.requestType(), req.full(), () -> processMultiEntryAction(req));
} else if (request instanceof ReadWriteSwapRowReplicaRequest) {
var req = (ReadWriteSwapRowReplicaRequest) request;
- return appendTxCommand(req.transactionId(), req.requestType(),
req.full(), () -> processTwoEntriesAction(req));
+ var opId = new OperationId(senderId, req.timestampLong());
+
+ return appendTxCommand(req.transactionId(), opId,
req.requestType(), req.full(), () -> processTwoEntriesAction(req));
} else if (request instanceof
ReadWriteScanRetrieveBatchReplicaRequest) {
var req = (ReadWriteScanRetrieveBatchReplicaRequest) request;
@@ -642,8 +653,10 @@ public class PartitionReplicaListener implements
ReplicaListener {
null
));
+ var opId = new OperationId(senderId, req.timestampLong());
+
// Implicit RW scan can be committed locally on a last batch or
error.
- return appendTxCommand(req.transactionId(), RequestType.RW_SCAN,
false, () -> processScanRetrieveBatchAction(req))
+ return appendTxCommand(req.transactionId(), opId,
RequestType.RW_SCAN, false, () -> processScanRetrieveBatchAction(req))
.thenCompose(rows -> {
if (allElementsAreNull(rows)) {
return completedFuture(rows);
@@ -1737,9 +1750,9 @@ public class PartitionReplicaListener implements
ReplicaListener {
txOps.futures.forEach((opType, futures) -> {
if (opType.isRwRead()) {
- txReadFutures.addAll(futures);
+ txReadFutures.addAll(futures.values());
} else {
- txUpdateFutures.addAll(futures);
+ txUpdateFutures.addAll(futures.values());
}
});
@@ -1877,12 +1890,19 @@ public class PartitionReplicaListener implements
ReplicaListener {
* Appends an operation to prevent the race between commit/rollback and
the operation execution.
*
* @param txId Transaction id.
+ * @param opId Operation id.
* @param cmdType Command type.
* @param full {@code True} if a full transaction and can be immediately
committed.
* @param op Operation closure.
* @return A future object representing the result of the given operation.
*/
- private <T> CompletableFuture<T> appendTxCommand(UUID txId, RequestType
cmdType, boolean full, Supplier<CompletableFuture<T>> op) {
+ private <T> CompletableFuture<T> appendTxCommand(
+ UUID txId,
+ OperationId opId,
+ RequestType cmdType,
+ boolean full,
+ Supplier<CompletableFuture<T>> op
+ ) {
if (full) {
return op.get().whenComplete((v, th) -> {
// Fast unlock.
@@ -1908,7 +1928,7 @@ public class PartitionReplicaListener implements
ReplicaListener {
txOps = new TxCleanupReadyFutureList();
}
- txOps.futures.computeIfAbsent(cmdType, type -> new
ArrayList<>()).add(cleanupReadyFut);
+ txOps.futures.computeIfAbsent(cmdType, type -> new
HashMap<>()).put(opId, cleanupReadyFut);
return txOps;
});
@@ -3732,7 +3752,7 @@ public class PartitionReplicaListener implements
ReplicaListener {
/**
* Operation type is mapped operation futures.
*/
- final Map<RequestType, List<CompletableFuture<?>>> futures = new
EnumMap<>(RequestType.class);
+ final Map<RequestType, Map<OperationId, CompletableFuture<?>>> futures
= new EnumMap<>(RequestType.class);
}
@Override
@@ -3822,6 +3842,7 @@ public class PartitionReplicaListener implements
ReplicaListener {
}
private CompletableFuture<?> processOperationRequestWithTxRwCounter(
+ String senderId,
ReplicaRequest request,
@Nullable Boolean isPrimary,
@Nullable HybridTimestamp opStartTsIfDirectRo
@@ -3836,7 +3857,7 @@ public class PartitionReplicaListener implements
ReplicaListener {
}
}
- return processOperationRequest(request, isPrimary, opStartTsIfDirectRo)
+ return processOperationRequest(senderId, request, isPrimary,
opStartTsIfDirectRo)
.whenComplete((unused, throwable) -> {
if (request instanceof ReadWriteReplicaRequest) {
txRwOperationTracker.decrementOperationCount(
@@ -3922,4 +3943,50 @@ public class PartitionReplicaListener implements
ReplicaListener {
private @Nullable BinaryRow upgrage(@Nullable BinaryRow source, int
targetSchemaVersion) {
return source == null ? null : new BinaryRowUpgrader(schemaRegistry,
targetSchemaVersion).upgrade(source);
}
+
+ /**
+ * Operation unique identifier.
+ */
+ private static class OperationId {
+ /** Operation node initiator id. */
+ private String initiatorId;
+
+ /** Timestamp. */
+ private long ts;
+
+ /**
+ * The constructor.
+ *
+ * @param initiatorId Sender node id.
+ * @param ts Timestamp.
+ */
+ public OperationId(String initiatorId, long ts) {
+ this.initiatorId = initiatorId;
+ this.ts = ts;
+ }
+
+ @Override
+ public boolean equals(Object o) {
+ if (this == o) {
+ return true;
+ }
+ if (o == null || getClass() != o.getClass()) {
+ return false;
+ }
+
+ OperationId that = (OperationId) o;
+
+ if (ts != that.ts) {
+ return false;
+ }
+ return initiatorId.equals(that.initiatorId);
+ }
+
+ @Override
+ public int hashCode() {
+ int result = initiatorId.hashCode();
+ result = 31 * result + (int) (ts ^ (ts >>> 32));
+ return result;
+ }
+ }
}
diff --git
a/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/storage/InternalTableImpl.java
b/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/storage/InternalTableImpl.java
index 68446acc84..9505e2c3b4 100644
---
a/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/storage/InternalTableImpl.java
+++
b/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/storage/InternalTableImpl.java
@@ -29,6 +29,7 @@ import static
org.apache.ignite.internal.table.distributed.storage.RowBatch.allR
import static
org.apache.ignite.internal.util.CompletableFutures.emptyListCompletedFuture;
import static
org.apache.ignite.internal.util.CompletableFutures.nullCompletedFuture;
import static org.apache.ignite.internal.util.ExceptionUtils.withCause;
+import static
org.apache.ignite.internal.util.FastTimestamps.coarseCurrentTimeMillis;
import static org.apache.ignite.lang.ErrorGroups.Common.INTERNAL_ERR;
import static
org.apache.ignite.lang.ErrorGroups.Replicator.REPLICA_UNAVAILABLE_ERR;
import static org.apache.ignite.lang.ErrorGroups.Transactions.ACQUIRE_LOCK_ERR;
@@ -175,6 +176,12 @@ public class InternalTableImpl implements InternalTable {
/** Table raft service. */
private final TableRaftServiceImpl tableRaftService;
+ /** Implicit transaction timeout. */
+ private final long implicitTransactionTimeout;
+
+ /** Attempts to take lock. */
+ private final int attemptsObtainLock;
+
/**
* Constructor.
*
@@ -190,6 +197,8 @@ public class InternalTableImpl implements InternalTable {
* @param placementDriver Placement driver.
* @param tableRaftService Table raft service.
* @param transactionInflights Transaction inflights.
+ * @param implicitTransactionTimeout Implicit transaction timeout.
+ * @param attemptsObtainLock Attempts to take lock.
*/
public InternalTableImpl(
String tableName,
@@ -204,7 +213,9 @@ public class InternalTableImpl implements InternalTable {
HybridTimestampTracker observableTimestampTracker,
PlacementDriver placementDriver,
TableRaftServiceImpl tableRaftService,
- TransactionInflights transactionInflights
+ TransactionInflights transactionInflights,
+ long implicitTransactionTimeout,
+ int attemptsObtainLock
) {
this.tableName = tableName;
this.tableId = tableId;
@@ -220,6 +231,8 @@ public class InternalTableImpl implements InternalTable {
this.placementDriver = placementDriver;
this.tableRaftService = tableRaftService;
this.transactionInflights = transactionInflights;
+ this.implicitTransactionTimeout = implicitTransactionTimeout;
+ this.attemptsObtainLock = attemptsObtainLock;
}
/** {@inheritDoc} */
@@ -265,7 +278,25 @@ public class InternalTableImpl implements InternalTable {
* @param tx The transaction, not null if explicit.
* @param fac Replica requests factory.
* @param noWriteChecker Used to handle operations producing no updates.
- * @param retryOnLockConflict {@code True} to retry on lock conflict.
+ * @return The future.
+ */
+ private <R> CompletableFuture<R> enlistInTx(
+ BinaryRowEx row,
+ @Nullable InternalTransaction tx,
+ IgniteTriFunction<InternalTransaction, ReplicationGroupId, Long,
ReplicaRequest> fac,
+ BiPredicate<R, ReplicaRequest> noWriteChecker
+ ) {
+ return enlistInTx(row, tx, fac, noWriteChecker, null);
+ }
+
+ /**
+ * Enlists a single row into a transaction.
+ *
+ * @param row The row.
+ * @param tx The transaction, not null if explicit.
+ * @param fac Replica requests factory.
+ * @param noWriteChecker Used to handle operations producing no updates.
+ * @param txStartTs Transaction start time or {@code null}. This parameter
is used only for retry.
* @return The future.
*/
private <R> CompletableFuture<R> enlistInTx(
@@ -273,7 +304,7 @@ public class InternalTableImpl implements InternalTable {
@Nullable InternalTransaction tx,
IgniteTriFunction<InternalTransaction, ReplicationGroupId, Long,
ReplicaRequest> fac,
BiPredicate<R, ReplicaRequest> noWriteChecker,
- boolean retryOnLockConflict
+ @Nullable Long txStartTs
) {
// Check whether proposed tx is read-only. Complete future
exceptionally if true.
// Attempting to enlist a read-only in a read-write transaction does
not corrupt the transaction itself, thus read-write transaction
@@ -308,20 +339,55 @@ public class InternalTableImpl implements InternalTable {
false,
primaryReplicaAndConsistencyToken,
noWriteChecker,
- retryOnLockConflict
+ attemptsObtainLock
);
} else {
- fut = enlistWithRetry(
+ fut = enlistAndInvoke(
actualTx,
partId,
enlistmentConsistencyToken -> fac.apply(actualTx,
partGroupId, enlistmentConsistencyToken),
implicit,
- noWriteChecker,
- retryOnLockConflict
+ noWriteChecker
);
}
- return postEnlist(fut, false, actualTx, implicit);
+ return postEnlist(fut, false, actualTx, implicit).handle((r, e) -> {
+ if (e != null) {
+ if (implicit) {
+ long ts = (txStartTs == null) ?
actualTx.startTimestamp().getPhysical() : txStartTs;
+
+ if (isRestartTransactionPossible(e) &&
coarseCurrentTimeMillis() - ts < implicitTransactionTimeout) {
+ return enlistInTx(row, null, fac, noWriteChecker, ts);
+ }
+ }
+
+ throw wrapReplicationException(e);
+ }
+
+ return completedFuture(r);
+ }).thenCompose(x -> x);
+ }
+
+ /**
+ * Enlists a single row into a transaction.
+ *
+ * @param keyRows Rows.
+ * @param tx The transaction.
+ * @param fac Replica requests factory.
+ * @param reducer Transform reducer.
+ * @param noOpChecker Used to handle no-op operations (producing no
updates).
+ * @return The future.
+ */
+ private <T> CompletableFuture<T> enlistInTx(
+ Collection<BinaryRowEx> keyRows,
+ @Nullable InternalTransaction tx,
+ IgnitePentaFunction<
+ Collection<? extends BinaryRow>, InternalTransaction,
ReplicationGroupId, Long, Boolean, ReplicaRequest
+ > fac,
+ Function<Collection<RowBatch>, CompletableFuture<T>> reducer,
+ BiPredicate<T, ReplicaRequest> noOpChecker
+ ) {
+ return enlistInTx(keyRows, tx, fac, reducer, noOpChecker, null);
}
/**
@@ -332,7 +398,7 @@ public class InternalTableImpl implements InternalTable {
* @param fac Replica requests factory.
* @param reducer Transform reducer.
* @param noOpChecker Used to handle no-op operations (producing no
updates).
- * @param retryOnLockConflict {@code True} to retry on lock conflict.
+ * @param txStartTs Transaction start time or {@code null}. This parameter
is used only for retry.
* @return The future.
*/
private <T> CompletableFuture<T> enlistInTx(
@@ -343,7 +409,7 @@ public class InternalTableImpl implements InternalTable {
> fac,
Function<Collection<RowBatch>, CompletableFuture<T>> reducer,
BiPredicate<T, ReplicaRequest> noOpChecker,
- boolean retryOnLockConflict
+ @Nullable Long txStartTs
) {
// Check whether proposed tx is read-only. Complete future
exceptionally if true.
// Attempting to enlist a read-only in a read-write transaction does
not corrupt the transaction itself, thus read-write transaction
@@ -386,17 +452,16 @@ public class InternalTableImpl implements InternalTable {
false,
primaryReplicaAndConsistencyToken,
noOpChecker,
- retryOnLockConflict
+ attemptsObtainLock
);
} else {
- fut = enlistWithRetry(
+ fut = enlistAndInvoke(
actualTx,
partitionId,
enlistmentConsistencyToken ->
fac.apply(rowBatch.requestedRows, actualTx,
partGroupId, enlistmentConsistencyToken, full),
full,
- noOpChecker,
- retryOnLockConflict
+ noOpChecker
);
}
@@ -405,7 +470,21 @@ public class InternalTableImpl implements InternalTable {
CompletableFuture<T> fut =
reducer.apply(rowBatchByPartitionId.values());
- return postEnlist(fut, implicit && !singlePart, actualTx, full);
+ return postEnlist(fut, implicit && !singlePart, actualTx,
full).handle((r, e) -> {
+ if (e != null) {
+ if (implicit) {
+ long ts = (txStartTs == null) ?
actualTx.startTimestamp().getPhysical() : txStartTs;
+
+ if (isRestartTransactionPossible(e) &&
coarseCurrentTimeMillis() - ts < implicitTransactionTimeout) {
+ return enlistInTx(keyRows, null, fac, reducer,
noOpChecker, ts);
+ }
+ }
+
+ throw wrapReplicationException(e);
+ }
+
+ return completedFuture(r);
+ }).thenCompose(x -> x);
}
private InternalTransaction startImplicitRwTxIfNeeded(@Nullable
InternalTransaction tx) {
@@ -468,7 +547,7 @@ public class InternalTableImpl implements InternalTable {
if (primaryReplicaAndConsistencyToken != null) {
fut = replicaSvc.invoke(primaryReplicaAndConsistencyToken.get1(),
mapFunc.apply(primaryReplicaAndConsistencyToken.get2()));
} else {
- fut = enlistWithRetry(tx, partId, mapFunc, false, null, false);
+ fut = enlistAndInvoke(tx, partId, mapFunc, false, null);
}
return postEnlist(fut, false, tx, false);
@@ -486,39 +565,25 @@ public class InternalTableImpl implements InternalTable {
}
/**
- * Partition enlisting with retrying.
+ * Enlists a partition and invokes the replica.
*
* @param tx Internal transaction.
* @param partId Partition number.
* @param mapFunc Function to create replica request with new enlistment
consistency token.
* @param full {@code True} if is a full transaction.
* @param noWriteChecker Used to handle operations producing no updates.
- * @param retryOnLockConflict {@code True} to retry on lock conflict.
* @return The future.
*/
- private <R> CompletableFuture<R> enlistWithRetry(
+ private <R> CompletableFuture<R> enlistAndInvoke(
InternalTransaction tx,
int partId,
Function<Long, ReplicaRequest> mapFunc,
boolean full,
- @Nullable BiPredicate<R, ReplicaRequest> noWriteChecker,
- boolean retryOnLockConflict
+ @Nullable BiPredicate<R, ReplicaRequest> noWriteChecker
) {
- return (CompletableFuture<R>) enlist(partId, tx)
+ return enlist(partId, tx)
.thenCompose(primaryReplicaAndConsistencyToken ->
- trackingInvoke(tx, partId, mapFunc, full,
primaryReplicaAndConsistencyToken, noWriteChecker, retryOnLockConflict))
- .handle((res0, e) -> {
- if (e != null) {
- // We can safely retry indefinitely on deadlock
prevention.
- if (retryOnLockConflict && e.getCause() instanceof
LockException) {
- return enlistWithRetry(tx, partId, mapFunc, full,
noWriteChecker, true);
- }
-
- return failedFuture(e);
- }
-
- return completedFuture(res0);
- }).thenCompose(x -> x);
+ trackingInvoke(tx, partId, mapFunc, full,
primaryReplicaAndConsistencyToken, noWriteChecker, attemptsObtainLock));
}
/**
@@ -540,7 +605,7 @@ public class InternalTableImpl implements InternalTable {
boolean full,
IgniteBiTuple<ClusterNode, Long> primaryReplicaAndConsistencyToken,
@Nullable BiPredicate<R, ReplicaRequest> noWriteChecker,
- boolean retryOnLockConflict
+ int retryOnLockConflict
) {
assert !tx.isReadOnly() : format("Tracking invoke is available only
for read-write transactions [tx={}].", tx);
@@ -573,14 +638,27 @@ public class InternalTableImpl implements InternalTable {
}
return res;
- }).exceptionally(e -> {
- if (retryOnLockConflict && e.getCause() instanceof
LockException) {
- transactionInflights.removeInflight(tx.id()); // Will be
retried.
+ }).handle((r, e) -> {
+ if (e != null) {
+ if (retryOnLockConflict > 0 && e.getCause() instanceof
LockException) {
+ transactionInflights.removeInflight(tx.id()); // Will
be retried.
+
+ return trackingInvoke(
+ tx,
+ partId,
+ ignored -> request,
+ full,
+ primaryReplicaAndConsistencyToken,
+ noWriteChecker,
+ retryOnLockConflict - 1
+ );
+ }
+
+ ExceptionUtils.sneakyThrow(e);
}
- ExceptionUtils.sneakyThrow(e);
- return null; // Unreachable.
- });
+ return completedFuture(r);
+ }).thenCompose(x -> x);
} else {
return replicaSvc.invoke(primaryReplicaAndConsistencyToken.get1(),
request);
}
@@ -798,8 +876,7 @@ public class InternalTableImpl implements InternalTable {
.full(tx == null)
.coordinatorId(txo.coordinatorId())
.build(),
- (res, req) -> false,
- false
+ (res, req) -> false
);
}
@@ -877,8 +954,7 @@ public class InternalTableImpl implements InternalTable {
(keyRows0, txo, groupId, enlistmentConsistencyToken, full) ->
readWriteMultiRowPkReplicaRequest(RW_GET_ALL,
keyRows0, txo, groupId, enlistmentConsistencyToken, full),
InternalTableImpl::collectMultiRowsResponsesWithRestoreOrder,
- (res, req) -> false,
- false
+ (res, req) -> false
);
}
@@ -997,8 +1073,7 @@ public class InternalTableImpl implements InternalTable {
.full(tx == null)
.coordinatorId(txo.coordinatorId())
.build(),
- (res, req) -> false,
- false
+ (res, req) -> false
);
}
@@ -1010,8 +1085,7 @@ public class InternalTableImpl implements InternalTable {
tx,
this::upsertAllInternal,
RowBatch::allResultFutures,
- (res, req) -> false,
- false
+ (res, req) -> false
);
}
@@ -1021,13 +1095,12 @@ public class InternalTableImpl implements InternalTable
{
InternalTransaction tx = txManager.begin(observableTimestampTracker);
TablePartitionId partGroupId = new TablePartitionId(tableId,
partition);
- CompletableFuture<Void> fut = enlistWithRetry(
+ CompletableFuture<Void> fut = enlistAndInvoke(
tx,
partition,
enlistmentConsistencyToken -> upsertAllInternal(rows, deleted,
tx, partGroupId, enlistmentConsistencyToken, true),
true,
- null,
- true // Allow auto retries for data streamer.
+ null
);
return postEnlist(fut, false, tx, true); // Will be committed in one
RTT.
@@ -1051,8 +1124,7 @@ public class InternalTableImpl implements InternalTable {
.full(tx == null)
.coordinatorId(txo.coordinatorId())
.build(),
- (res, req) -> false,
- false
+ (res, req) -> false
);
}
@@ -1074,8 +1146,7 @@ public class InternalTableImpl implements InternalTable {
.full(tx == null)
.coordinatorId(txo.coordinatorId())
.build(),
- (res, req) -> !res,
- false
+ (res, req) -> !res
);
}
@@ -1104,8 +1175,7 @@ public class InternalTableImpl implements InternalTable {
// All values are null, this means nothing was deleted.
return true;
- },
- false
+ }
);
}
@@ -1153,8 +1223,7 @@ public class InternalTableImpl implements InternalTable {
.full(tx == null)
.coordinatorId(txo.coordinatorId())
.build(),
- (res, req) -> !res,
- false
+ (res, req) -> !res
);
}
@@ -1180,8 +1249,7 @@ public class InternalTableImpl implements InternalTable {
.full(tx == null)
.coordinatorId(txo.coordinatorId())
.build(),
- (res, req) -> !res,
- false
+ (res, req) -> !res
);
}
@@ -1203,8 +1271,7 @@ public class InternalTableImpl implements InternalTable {
.full(tx == null)
.coordinatorId(txo.coordinatorId())
.build(),
- (res, req) -> res == null,
- false
+ (res, req) -> res == null
);
}
@@ -1226,8 +1293,7 @@ public class InternalTableImpl implements InternalTable {
.full(tx == null)
.coordinatorId(txo.coordinatorId())
.build(),
- (res, req) -> !res,
- false
+ (res, req) -> !res
);
}
@@ -1249,8 +1315,7 @@ public class InternalTableImpl implements InternalTable {
.full(tx == null)
.coordinatorId(txo.coordinatorId())
.build(),
- (res, req) -> !res,
- false
+ (res, req) -> !res
);
}
@@ -1272,8 +1337,7 @@ public class InternalTableImpl implements InternalTable {
.full(tx == null)
.coordinatorId(txo.coordinatorId())
.build(),
- (res, req) -> res == null,
- false
+ (res, req) -> res == null
);
}
@@ -1295,8 +1359,7 @@ public class InternalTableImpl implements InternalTable {
// All values are null, this means nothing was deleted.
return true;
- },
- false
+ }
);
}
@@ -1329,8 +1392,7 @@ public class InternalTableImpl implements InternalTable {
// All values are null, this means nothing was deleted.
return true;
- },
- false
+ }
);
}
@@ -2162,4 +2224,26 @@ public class InternalTableImpl implements InternalTable {
return readWriteMultiRowReplicaRequest(
RequestType.RW_UPSERT_ALL, keyRows0, deleted, txo, groupId,
enlistmentConsistencyToken, full);
}
+
+ /**
+ * Ensure that the exception allows you to restart a transaction.
+ *
+ * @param e Exception to check.
+ * @return True if retrying is possible, false otherwise.
+ */
+ private boolean isRestartTransactionPossible(Throwable e) {
+ if (e instanceof LockException) {
+ return true;
+ } else if (e instanceof TransactionException && e.getCause()
instanceof LockException) {
+ return true;
+ } else if (e instanceof CompletionException && e.getCause() instanceof
LockException) {
+ return true;
+ } else if (e instanceof CompletionException
+ && e.getCause() instanceof TransactionException
+ && e.getCause().getCause() instanceof LockException) {
+ return true;
+ }
+
+ return false;
+ }
}
diff --git
a/modules/table/src/test/java/org/apache/ignite/internal/table/distributed/TableManagerRecoveryTest.java
b/modules/table/src/test/java/org/apache/ignite/internal/table/distributed/TableManagerRecoveryTest.java
index 2cc7e95082..3d9c5fcea5 100644
---
a/modules/table/src/test/java/org/apache/ignite/internal/table/distributed/TableManagerRecoveryTest.java
+++
b/modules/table/src/test/java/org/apache/ignite/internal/table/distributed/TableManagerRecoveryTest.java
@@ -110,6 +110,7 @@ import
org.apache.ignite.internal.thread.IgniteThreadFactory;
import org.apache.ignite.internal.thread.StripedThreadPoolExecutor;
import org.apache.ignite.internal.tx.HybridTimestampTracker;
import org.apache.ignite.internal.tx.TxManager;
+import org.apache.ignite.internal.tx.configuration.TransactionConfiguration;
import org.apache.ignite.internal.tx.impl.RemotelyTriggeredResourceRegistry;
import org.apache.ignite.internal.tx.impl.TransactionInflights;
import org.apache.ignite.internal.tx.storage.state.TxStateTableStorage;
@@ -149,6 +150,8 @@ public class TableManagerRecoveryTest extends
IgniteAbstractTest {
@InjectConfiguration
private GcConfiguration gcConfig;
@InjectConfiguration
+ private TransactionConfiguration txConfig;
+ @InjectConfiguration
private StorageUpdateConfiguration storageUpdateConfiguration;
@WorkDirectory
private Path workDir;
@@ -299,6 +302,7 @@ public class TableManagerRecoveryTest extends
IgniteAbstractTest {
NODE_NAME,
revisionUpdater,
gcConfig,
+ txConfig,
storageUpdateConfiguration,
clusterService.messagingService(),
clusterService.topologyService(),
diff --git
a/modules/table/src/test/java/org/apache/ignite/internal/table/distributed/TableManagerTest.java
b/modules/table/src/test/java/org/apache/ignite/internal/table/distributed/TableManagerTest.java
index 00a4ce4480..5445984c3d 100644
---
a/modules/table/src/test/java/org/apache/ignite/internal/table/distributed/TableManagerTest.java
+++
b/modules/table/src/test/java/org/apache/ignite/internal/table/distributed/TableManagerTest.java
@@ -126,6 +126,7 @@ import
org.apache.ignite.internal.testframework.IgniteAbstractTest;
import org.apache.ignite.internal.thread.IgniteThreadFactory;
import org.apache.ignite.internal.tx.HybridTimestampTracker;
import org.apache.ignite.internal.tx.TxManager;
+import org.apache.ignite.internal.tx.configuration.TransactionConfiguration;
import org.apache.ignite.internal.tx.impl.RemotelyTriggeredResourceRegistry;
import org.apache.ignite.internal.tx.impl.TransactionInflights;
import org.apache.ignite.internal.tx.storage.state.TxStateStorage;
@@ -208,6 +209,9 @@ public class TableManagerTest extends IgniteAbstractTest {
@InjectConfiguration
private GcConfiguration gcConfig;
+ @InjectConfiguration
+ private TransactionConfiguration txConfig;
+
/** Storage update configuration. */
@InjectConfiguration
private StorageUpdateConfiguration storageUpdateConfiguration;
@@ -753,6 +757,7 @@ public class TableManagerTest extends IgniteAbstractTest {
NODE_NAME,
revisionUpdater,
gcConfig,
+ txConfig,
storageUpdateConfiguration,
clusterService.messagingService(),
clusterService.topologyService(),
diff --git
a/modules/table/src/test/java/org/apache/ignite/internal/table/distributed/storage/InternalTableImplTest.java
b/modules/table/src/test/java/org/apache/ignite/internal/table/distributed/storage/InternalTableImplTest.java
index b8871d1007..0919a7c71a 100644
---
a/modules/table/src/test/java/org/apache/ignite/internal/table/distributed/storage/InternalTableImplTest.java
+++
b/modules/table/src/test/java/org/apache/ignite/internal/table/distributed/storage/InternalTableImplTest.java
@@ -72,7 +72,9 @@ public class InternalTableImplTest extends
BaseIgniteAbstractTest {
new HybridTimestampTracker(),
mock(PlacementDriver.class),
new TableRaftServiceImpl("test", 1, Int2ObjectMaps.emptyMap(),
new SingleClusterNodeResolver(mock(ClusterNode.class))),
- mock(TransactionInflights.class)
+ mock(TransactionInflights.class),
+ 3_000,
+ 0
);
// Let's check the empty table.
@@ -119,7 +121,9 @@ public class InternalTableImplTest extends
BaseIgniteAbstractTest {
new HybridTimestampTracker(),
mock(PlacementDriver.class),
new TableRaftServiceImpl("test", 3, Int2ObjectMaps.emptyMap(),
new SingleClusterNodeResolver(mock(ClusterNode.class))),
- mock(TransactionInflights.class)
+ mock(TransactionInflights.class),
+ 3_000,
+ 0
);
List<BinaryRowEx> originalRows = List.of(
diff --git
a/modules/table/src/testFixtures/java/org/apache/ignite/distributed/ItTxTestCluster.java
b/modules/table/src/testFixtures/java/org/apache/ignite/distributed/ItTxTestCluster.java
index ee9fa70414..f9af509904 100644
---
a/modules/table/src/testFixtures/java/org/apache/ignite/distributed/ItTxTestCluster.java
+++
b/modules/table/src/testFixtures/java/org/apache/ignite/distributed/ItTxTestCluster.java
@@ -718,7 +718,9 @@ public class ItTxTestCluster {
timestampTracker,
placementDriver,
new TableRaftServiceImpl(tableName, 1, clients,
nodeResolver),
- clientTransactionInflights
+ clientTransactionInflights,
+ 500,
+ 0
),
new DummySchemaManagerImpl(schemaDescriptor),
clientTxManager.lockManager(),
diff --git
a/modules/table/src/testFixtures/java/org/apache/ignite/internal/table/impl/DummyInternalTableImpl.java
b/modules/table/src/testFixtures/java/org/apache/ignite/internal/table/impl/DummyInternalTableImpl.java
index c9bba46a80..c998f5314d 100644
---
a/modules/table/src/testFixtures/java/org/apache/ignite/internal/table/impl/DummyInternalTableImpl.java
+++
b/modules/table/src/testFixtures/java/org/apache/ignite/internal/table/impl/DummyInternalTableImpl.java
@@ -246,7 +246,9 @@ public class DummyInternalTableImpl extends
InternalTableImpl {
Int2ObjectMaps.singleton(PART_ID,
mock(RaftGroupService.class)),
new SingleClusterNodeResolver(LOCAL_NODE)
),
- transactionInflights
+ transactionInflights,
+ 3_000,
+ 0
);
RaftGroupService svc =
tableRaftService().partitionRaftGroupService(PART_ID);
diff --git
a/modules/transactions/src/main/java/org/apache/ignite/internal/tx/configuration/TransactionConfigurationSchema.java
b/modules/transactions/src/main/java/org/apache/ignite/internal/tx/configuration/TransactionConfigurationSchema.java
index 57ee2c977b..84e447cb43 100644
---
a/modules/transactions/src/main/java/org/apache/ignite/internal/tx/configuration/TransactionConfigurationSchema.java
+++
b/modules/transactions/src/main/java/org/apache/ignite/internal/tx/configuration/TransactionConfigurationSchema.java
@@ -34,4 +34,14 @@ public class TransactionConfigurationSchema {
@Range(min = 0)
@Value(hasDefault = true)
public final long abandonedCheckTs = DEFAULT_ABANDONED_CHECK_TS;
+
+ /** Timeout for implicit transactions. */
+ @Range(min = 0)
+ @Value(hasDefault = true)
+ public final long implicitTransactionTimeout = 3_000;
+
+ /** A transaction tries to take lock several times until it throws an
exception {@lonk org.apache.ignite.tx.TransactionException}. */
+ @Range(min = 0)
+ @Value(hasDefault = true)
+ public final int attemptsObtainLock = 3;
}