This is an automated email from the ASF dual-hosted git repository.
ascherbakov pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/ignite-3.git
The following commit(s) were added to refs/heads/main by this push:
new 765c7ee992f IGNITE-26458 Skip commit partition+cleanup path for RW-R
transactions
765c7ee992f is described below
commit 765c7ee992fdc8a3addb7c5049f968383e0576f7
Author: Alexey Scherbakov <[email protected]>
AuthorDate: Fri Oct 3 12:54:14 2025 +0300
IGNITE-26458 Skip commit partition+cleanup path for RW-R transactions
---
.../handler/ClientInboundMessageHandler.java | 2 +-
.../apache/ignite/client/fakes/FakeTxManager.java | 1 +
.../partition/replicator/TxRecoveryEngine.java | 6 +-
modules/runner/build.gradle | 20 +++
...Benchmark.java => ClientKvGetAllBenchmark.java} | 30 +++--
...k.java => ClientKvGetAllExplicitBenchmark.java} | 29 +++--
.../internal/benchmark/ClientKvGetBenchmark.java | 2 +-
.../runner/app/client/ItThinClientSqlTest.java | 29 +++++
.../app/client/ItThinClientTransactionsTest.java | 67 +++++++---
.../ItThinClientTransactionsWithReplicasTest.java | 6 +-
.../InflightTransactionalOperationTracker.java | 12 +-
.../engine/QueryTransactionWrapperSelfTest.java | 35 +++---
.../ItAbstractInternalTableScanTest.java | 14 +--
.../ItInternalTableReadOnlyScanTest.java | 2 +-
.../ignite/distributed/ItTxStateLocalMapTest.java | 34 ++---
.../ignite/internal/table/ItColocationTest.java | 1 +
.../distributed/storage/InternalTableImpl.java | 4 +-
.../replication/PartitionReplicaListenerTest.java | 6 +-
.../ZonePartitionReplicaListenerTest.java | 6 +-
.../distributed/storage/InternalTableImplTest.java | 5 +-
.../apache/ignite/internal/tx/ItTxCleanupTest.java | 7 ++
.../tx/distributed/ItTransactionRecoveryTest.java | 1 +
.../org/apache/ignite/internal/tx/TxManager.java | 12 +-
.../internal/tx/impl/ReadOnlyTransactionImpl.java | 4 +-
.../internal/tx/impl/ReadWriteTransactionImpl.java | 1 +
.../internal/tx/impl/TransactionInflights.java | 139 ++++++++++++++++-----
.../internal/tx/impl/TxCleanupRequestSender.java | 42 ++++---
.../ignite/internal/tx/impl/TxManagerImpl.java | 36 +++++-
.../apache/ignite/internal/tx/TxManagerTest.java | 10 +-
.../tx/impl/ReadWriteTransactionImplTest.java | 4 +-
30 files changed, 403 insertions(+), 164 deletions(-)
diff --git
a/modules/client-handler/src/main/java/org/apache/ignite/client/handler/ClientInboundMessageHandler.java
b/modules/client-handler/src/main/java/org/apache/ignite/client/handler/ClientInboundMessageHandler.java
index 5b86b2e261d..6fe212bac8e 100644
---
a/modules/client-handler/src/main/java/org/apache/ignite/client/handler/ClientInboundMessageHandler.java
+++
b/modules/client-handler/src/main/java/org/apache/ignite/client/handler/ClientInboundMessageHandler.java
@@ -1094,7 +1094,7 @@ public class ClientInboundMessageHandler
&&
primaryReplicaMaxStartTime.compareAndSet(lastSentMaxStartTime,
currentMaxStartTime);
if (primaryReplicasUpdated && LOG.isInfoEnabled()) {
- LOG.info("Partition primary replica changed, notifying client
[connectionId=" + connectionId + ", remoteAddress="
+ LOG.info("Partition primary replicas have changed, notifying the
client [connectionId=" + connectionId + ", remoteAddress="
+ ctx.channel().remoteAddress() + ']');
}
diff --git
a/modules/client/src/test/java/org/apache/ignite/client/fakes/FakeTxManager.java
b/modules/client/src/test/java/org/apache/ignite/client/fakes/FakeTxManager.java
index 3673094029e..8e52d164a73 100644
---
a/modules/client/src/test/java/org/apache/ignite/client/fakes/FakeTxManager.java
+++
b/modules/client/src/test/java/org/apache/ignite/client/fakes/FakeTxManager.java
@@ -222,6 +222,7 @@ public class FakeTxManager implements TxManager {
ReplicationGroupId commitPartition,
boolean commitIntent,
boolean timeoutExceeded,
+ boolean recovery,
Map<ReplicationGroupId, PendingTxPartitionEnlistment>
enlistedGroups,
UUID txId
) {
diff --git
a/modules/partition-replicator/src/main/java/org/apache/ignite/internal/partition/replicator/TxRecoveryEngine.java
b/modules/partition-replicator/src/main/java/org/apache/ignite/internal/partition/replicator/TxRecoveryEngine.java
index 694f8c02ddd..0ca53daf1d0 100644
---
a/modules/partition-replicator/src/main/java/org/apache/ignite/internal/partition/replicator/TxRecoveryEngine.java
+++
b/modules/partition-replicator/src/main/java/org/apache/ignite/internal/partition/replicator/TxRecoveryEngine.java
@@ -69,10 +69,8 @@ public class TxRecoveryEngine {
replicationGroupId,
false,
false,
- Map.of(
- replicationGroupId,
-
abandonedTxRecoveryEnlistmentFactory.apply(clusterNodeResolver.getById(senderId))
- ),
+ true,
+ Map.of(replicationGroupId,
abandonedTxRecoveryEnlistmentFactory.apply(clusterNodeResolver.getById(senderId))),
txId
)
.whenComplete((v, ex) -> runCleanupOnNode(replicationGroupId,
txId, senderId));
diff --git a/modules/runner/build.gradle b/modules/runner/build.gradle
index 6cbe14ca4dc..b7edce5679a 100644
--- a/modules/runner/build.gradle
+++ b/modules/runner/build.gradle
@@ -301,6 +301,26 @@ tasks.register('runClientGetBenchmark', JavaExec) {
enableAssertions = true
}
+tasks.register('runClientGetAllBenchmark', JavaExec) {
+ mainClass = 'org.apache.ignite.internal.benchmark.ClientKvGetAllBenchmark'
+
+ jvmArgs += addOpens + ['-Dio.netty.tryReflectionSetAccessible=true',
'-Xmx16g']
+
+ classpath = sourceSets.integrationTest.runtimeClasspath
+
+ enableAssertions = true
+}
+
+tasks.register('runClientGetAllExplicitBenchmark', JavaExec) {
+ mainClass =
'org.apache.ignite.internal.benchmark.ClientKvGetAllExplicitBenchmark'
+
+ jvmArgs += addOpens + ['-Dio.netty.tryReflectionSetAccessible=true',
'-Xmx16g']
+
+ classpath = sourceSets.integrationTest.runtimeClasspath
+
+ enableAssertions = true
+}
+
jar {
manifest {
attributes(
diff --git
a/modules/runner/src/integrationTest/java/org/apache/ignite/internal/benchmark/ClientKvGetBenchmark.java
b/modules/runner/src/integrationTest/java/org/apache/ignite/internal/benchmark/ClientKvGetAllBenchmark.java
similarity index 78%
copy from
modules/runner/src/integrationTest/java/org/apache/ignite/internal/benchmark/ClientKvGetBenchmark.java
copy to
modules/runner/src/integrationTest/java/org/apache/ignite/internal/benchmark/ClientKvGetAllBenchmark.java
index a2ab9b6653d..b85e26a6c46 100644
---
a/modules/runner/src/integrationTest/java/org/apache/ignite/internal/benchmark/ClientKvGetBenchmark.java
+++
b/modules/runner/src/integrationTest/java/org/apache/ignite/internal/benchmark/ClientKvGetAllBenchmark.java
@@ -17,7 +17,9 @@
package org.apache.ignite.internal.benchmark;
+import java.util.ArrayList;
import java.util.HashMap;
+import java.util.List;
import java.util.Map;
import java.util.concurrent.atomic.AtomicInteger;
import org.apache.ignite.table.Tuple;
@@ -28,13 +30,16 @@ import org.openjdk.jmh.runner.RunnerException;
/**
* Put then Get benchmark.
*/
-public class ClientKvGetBenchmark extends ClientKvBenchmark {
+public class ClientKvGetAllBenchmark extends ClientKvBenchmark {
@Param("1000")
private int keysPerThread;
@Param("1000")
private int loadBatchSize;
+ @Param("1000")
+ private int batch;
+
@Override
public void setUp() {
super.setUp();
@@ -85,25 +90,30 @@ public class ClientKvGetBenchmark extends ClientKvBenchmark
{
private final ThreadLocal<long[]> gen = ThreadLocal.withInitial(() -> new
long[1]);
/**
- * Benchmark for KV upsert via embedded client.
+ * Benchmark for KV get via embedded client.
*/
@Benchmark
public void get() {
- long[] cur = gen.get();
- cur[0] = cur[0] + 1;
+ List<Tuple> keys = new ArrayList<>();
- int id = (int) (base.get() + cur[0] % keysPerThread);
+ for (int i = 0; i < batch; i++) {
+ long[] cur = gen.get();
+ cur[0] = cur[0] + 1;
+
+ int id = (int) (base.get() + cur[0] % keysPerThread);
+ Tuple key = Tuple.create().set("ycsb_key", id);
+ keys.add(key);
+ }
- Tuple key = Tuple.create().set("ycsb_key", id);
- Tuple val = kvView.get(null, key);
- assert val != null : Thread.currentThread().getName() + " " + key;
+ Map<Tuple, Tuple> map = kvView.getAll(null, keys);
+ assert map.size() == batch : Thread.currentThread().getName() + " " +
keys;
}
/**
* Benchmark's entry point. Can be started from command line:
- * ./gradlew ":ignite-runner:runClientPutBenchmark" --args='jmh.batch=10
jmh.threads=1'
+ * ./gradlew ":ignite-runner:runClientGetAllBenchmark"
--args='jmh.batch=10 jmh.threads=1'
*/
public static void main(String[] args) throws RunnerException {
- runBenchmark(ClientKvGetBenchmark.class, args);
+ runBenchmark(ClientKvGetAllBenchmark.class, args);
}
}
diff --git
a/modules/runner/src/integrationTest/java/org/apache/ignite/internal/benchmark/ClientKvGetBenchmark.java
b/modules/runner/src/integrationTest/java/org/apache/ignite/internal/benchmark/ClientKvGetAllExplicitBenchmark.java
similarity index 78%
copy from
modules/runner/src/integrationTest/java/org/apache/ignite/internal/benchmark/ClientKvGetBenchmark.java
copy to
modules/runner/src/integrationTest/java/org/apache/ignite/internal/benchmark/ClientKvGetAllExplicitBenchmark.java
index a2ab9b6653d..572703d4898 100644
---
a/modules/runner/src/integrationTest/java/org/apache/ignite/internal/benchmark/ClientKvGetBenchmark.java
+++
b/modules/runner/src/integrationTest/java/org/apache/ignite/internal/benchmark/ClientKvGetAllExplicitBenchmark.java
@@ -21,6 +21,7 @@ import java.util.HashMap;
import java.util.Map;
import java.util.concurrent.atomic.AtomicInteger;
import org.apache.ignite.table.Tuple;
+import org.apache.ignite.tx.Transaction;
import org.openjdk.jmh.annotations.Benchmark;
import org.openjdk.jmh.annotations.Param;
import org.openjdk.jmh.runner.RunnerException;
@@ -28,13 +29,16 @@ import org.openjdk.jmh.runner.RunnerException;
/**
* Put then Get benchmark.
*/
-public class ClientKvGetBenchmark extends ClientKvBenchmark {
+public class ClientKvGetAllExplicitBenchmark extends ClientKvBenchmark {
@Param("1000")
private int keysPerThread;
@Param("1000")
private int loadBatchSize;
+ @Param("5")
+ private int batch;
+
@Override
public void setUp() {
super.setUp();
@@ -85,25 +89,30 @@ public class ClientKvGetBenchmark extends ClientKvBenchmark
{
private final ThreadLocal<long[]> gen = ThreadLocal.withInitial(() -> new
long[1]);
/**
- * Benchmark for KV upsert via embedded client.
+ * Benchmark for KV get via embedded client.
*/
@Benchmark
public void get() {
- long[] cur = gen.get();
- cur[0] = cur[0] + 1;
+ Transaction tx = publicIgnite.transactions().begin();
+
+ for (int i = 0; i < batch; i++) {
+ long[] cur = gen.get();
+ cur[0] = cur[0] + 1;
- int id = (int) (base.get() + cur[0] % keysPerThread);
+ int id = (int) (base.get() + cur[0] % keysPerThread);
+ Tuple key = Tuple.create().set("ycsb_key", id);
+ Tuple val = kvView.get(null, key);
+ assert val != null : Thread.currentThread().getName() + " " + key;
+ }
- Tuple key = Tuple.create().set("ycsb_key", id);
- Tuple val = kvView.get(null, key);
- assert val != null : Thread.currentThread().getName() + " " + key;
+ tx.commit();
}
/**
* Benchmark's entry point. Can be started from command line:
- * ./gradlew ":ignite-runner:runClientPutBenchmark" --args='jmh.batch=10
jmh.threads=1'
+ * ./gradlew ":ignite-runner:runClientGetAllExplicitBenchmark"
--args='jmh.batch=10 jmh.threads=1'
*/
public static void main(String[] args) throws RunnerException {
- runBenchmark(ClientKvGetBenchmark.class, args);
+ runBenchmark(ClientKvGetAllExplicitBenchmark.class, args);
}
}
diff --git
a/modules/runner/src/integrationTest/java/org/apache/ignite/internal/benchmark/ClientKvGetBenchmark.java
b/modules/runner/src/integrationTest/java/org/apache/ignite/internal/benchmark/ClientKvGetBenchmark.java
index a2ab9b6653d..e2ddfdb254f 100644
---
a/modules/runner/src/integrationTest/java/org/apache/ignite/internal/benchmark/ClientKvGetBenchmark.java
+++
b/modules/runner/src/integrationTest/java/org/apache/ignite/internal/benchmark/ClientKvGetBenchmark.java
@@ -101,7 +101,7 @@ public class ClientKvGetBenchmark extends ClientKvBenchmark
{
/**
* Benchmark's entry point. Can be started from command line:
- * ./gradlew ":ignite-runner:runClientPutBenchmark" --args='jmh.batch=10
jmh.threads=1'
+ * ./gradlew ":ignite-runner:runClientGetBenchmark" --args='jmh.batch=10
jmh.threads=1'
*/
public static void main(String[] args) throws RunnerException {
runBenchmark(ClientKvGetBenchmark.class, args);
diff --git
a/modules/runner/src/integrationTest/java/org/apache/ignite/internal/runner/app/client/ItThinClientSqlTest.java
b/modules/runner/src/integrationTest/java/org/apache/ignite/internal/runner/app/client/ItThinClientSqlTest.java
index ca14d6b4a75..41652b6ef47 100644
---
a/modules/runner/src/integrationTest/java/org/apache/ignite/internal/runner/app/client/ItThinClientSqlTest.java
+++
b/modules/runner/src/integrationTest/java/org/apache/ignite/internal/runner/app/client/ItThinClientSqlTest.java
@@ -37,8 +37,13 @@ import java.util.UUID;
import java.util.concurrent.CompletionException;
import java.util.stream.Collectors;
import org.apache.ignite.client.IgniteClient;
+import org.apache.ignite.internal.TestWrappers;
+import org.apache.ignite.internal.app.IgniteImpl;
import org.apache.ignite.internal.catalog.commands.CatalogUtils;
import org.apache.ignite.internal.security.authentication.UserDetails;
+import org.apache.ignite.internal.testframework.IgniteTestUtils;
+import org.apache.ignite.internal.tx.TxManager;
+import org.apache.ignite.internal.tx.impl.TransactionInflights;
import org.apache.ignite.lang.IgniteException;
import org.apache.ignite.sql.ColumnMetadata;
import org.apache.ignite.sql.ColumnType;
@@ -57,6 +62,7 @@ import org.apache.ignite.tx.Transaction;
import org.apache.ignite.tx.TransactionOptions;
import org.hamcrest.Matchers;
import org.junit.jupiter.api.AfterEach;
+import org.junit.jupiter.api.Disabled;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.params.ParameterizedTest;
import org.junit.jupiter.params.provider.ValueSource;
@@ -741,6 +747,29 @@ public class ItThinClientSqlTest extends
ItAbstractThinClientTest {
}
}
+ @Test
+ @Disabled("https://issues.apache.org/jira/browse/IGNITE-26567")
+ public void testBroadcastQueryTxInflightStateCleanup() {
+ IgniteSql sql = client().sql();
+
+ sql.execute(null, "CREATE TABLE t1 (id INT PRIMARY KEY, val
VARCHAR)").close();
+ sql.execute(null, String.format("CREATE INDEX IF NOT EXISTS idx1 ON t1
(val)"));
+ sql.execute(null, "INSERT INTO t1 (id, val) VALUES (1,
'test1')").close();
+
+ try (ResultSet<SqlRow> rs = sql.execute(null, "SELECT id FROM t1 WHERE
val = ?", "test1")) {
+ assertTrue(rs.hasNext());
+ assertEquals(1, rs.next().intValue(0));
+ assertFalse(rs.hasNext());
+ }
+
+ for (int i = 0; i < nodes(); i++) {
+ IgniteImpl server = TestWrappers.unwrapIgniteImpl(server(i));
+ TxManager txManager = server.txManager();
+ TransactionInflights transactionInflights =
IgniteTestUtils.getFieldValue(txManager, "transactionInflights");
+ assertFalse(transactionInflights.hasActiveInflights(), "Expecting
no active inflights");
+ }
+ }
+
private static class Pojo {
public int num;
diff --git
a/modules/runner/src/integrationTest/java/org/apache/ignite/internal/runner/app/client/ItThinClientTransactionsTest.java
b/modules/runner/src/integrationTest/java/org/apache/ignite/internal/runner/app/client/ItThinClientTransactionsTest.java
index 1357420abc2..3043f8aae18 100644
---
a/modules/runner/src/integrationTest/java/org/apache/ignite/internal/runner/app/client/ItThinClientTransactionsTest.java
+++
b/modules/runner/src/integrationTest/java/org/apache/ignite/internal/runner/app/client/ItThinClientTransactionsTest.java
@@ -57,7 +57,6 @@ import org.apache.ignite.internal.client.TcpIgniteClient;
import org.apache.ignite.internal.client.table.ClientTable;
import org.apache.ignite.internal.client.tx.ClientLazyTransaction;
import org.apache.ignite.internal.client.tx.ClientTransaction;
-import org.apache.ignite.internal.network.InternalClusterNode;
import org.apache.ignite.internal.table.partition.HashPartition;
import org.apache.ignite.internal.testframework.IgniteTestUtils;
import org.apache.ignite.internal.tx.TxState;
@@ -505,7 +504,7 @@ public class ItThinClientTransactionsTest extends
ItAbstractThinClientTest {
int start,
int count,
Map<Partition, ClusterNode> map,
- InternalClusterNode clusterNode,
+ ClusterNode clusterNode,
Table table
) {
String clusterNodeName = clusterNode.name();
@@ -595,7 +594,7 @@ public class ItThinClientTransactionsTest extends
ItAbstractThinClientTest {
IgniteImpl server0 = TestWrappers.unwrapIgniteImpl(server(0));
- List<Tuple> tuples0 = generateKeysForNode(200, 2, map,
server0.clusterService().topologyService().localMember(), table);
+ List<Tuple> tuples0 = generateKeysForNode(200, 2, map,
server0.cluster().localNode(), table);
Tuple key = tuples0.get(0);
Tuple val = val(key.intValue(0) + "");
@@ -626,8 +625,8 @@ public class ItThinClientTransactionsTest extends
ItAbstractThinClientTest {
IgniteImpl server0 = TestWrappers.unwrapIgniteImpl(server(0));
IgniteImpl server1 = TestWrappers.unwrapIgniteImpl(server(1));
- List<Tuple> tuples0 = generateKeysForNode(300, 1, map,
server0.clusterService().topologyService().localMember(), table);
- List<Tuple> tuples1 = generateKeysForNode(310, 1, map,
server1.clusterService().topologyService().localMember(), table);
+ List<Tuple> tuples0 = generateKeysForNode(300, 1, map,
server0.cluster().localNode(), table);
+ List<Tuple> tuples1 = generateKeysForNode(310, 1, map,
server1.cluster().localNode(), table);
Map<Tuple, Tuple> data = new HashMap<>();
@@ -658,8 +657,8 @@ public class ItThinClientTransactionsTest extends
ItAbstractThinClientTest {
IgniteImpl server0 = TestWrappers.unwrapIgniteImpl(server(0));
IgniteImpl server1 = TestWrappers.unwrapIgniteImpl(server(1));
- List<Tuple> tuples0 = generateKeysForNode(400, 1, map,
server0.clusterService().topologyService().localMember(), table);
- List<Tuple> tuples1 = generateKeysForNode(410, 1, map,
server1.clusterService().topologyService().localMember(), table);
+ List<Tuple> tuples0 = generateKeysForNode(400, 1, map,
server0.cluster().localNode(), table);
+ List<Tuple> tuples1 = generateKeysForNode(410, 1, map,
server1.cluster().localNode(), table);
Map<Tuple, Tuple> data = new HashMap<>();
@@ -707,8 +706,8 @@ public class ItThinClientTransactionsTest extends
ItAbstractThinClientTest {
IgniteImpl server0 = TestWrappers.unwrapIgniteImpl(server(0));
IgniteImpl server1 = TestWrappers.unwrapIgniteImpl(server(1));
- List<Tuple> tuples0 = generateKeysForNode(500, 1, map,
server0.clusterService().topologyService().localMember(), table);
- List<Tuple> tuples1 = generateKeysForNode(510, 80, map,
server1.clusterService().topologyService().localMember(), table);
+ List<Tuple> tuples0 = generateKeysForNode(500, 1, map,
server0.cluster().localNode(), table);
+ List<Tuple> tuples1 = generateKeysForNode(510, 80, map,
server1.cluster().localNode(), table);
ClientLazyTransaction tx0 = (ClientLazyTransaction)
client().transactions().begin();
@@ -834,8 +833,8 @@ public class ItThinClientTransactionsTest extends
ItAbstractThinClientTest {
IgniteImpl server0 = TestWrappers.unwrapIgniteImpl(server(0));
IgniteImpl server1 = TestWrappers.unwrapIgniteImpl(server(1));
- List<Tuple> tuples0 = generateKeysForNode(600, 50, map,
server0.clusterService().topologyService().localMember(), table);
- List<Tuple> tuples1 = generateKeysForNode(610, 50, map,
server1.clusterService().topologyService().localMember(), table);
+ List<Tuple> tuples0 = generateKeysForNode(600, 50, map,
server0.cluster().localNode(), table);
+ List<Tuple> tuples1 = generateKeysForNode(610, 50, map,
server1.cluster().localNode(), table);
Map<Tuple, Tuple> batch = new HashMap<>();
@@ -865,8 +864,8 @@ public class ItThinClientTransactionsTest extends
ItAbstractThinClientTest {
IgniteImpl server0 = TestWrappers.unwrapIgniteImpl(server(0));
IgniteImpl server1 = TestWrappers.unwrapIgniteImpl(server(1));
- List<Tuple> tuples0 = generateKeysForNode(700, 50, map,
server0.clusterService().topologyService().localMember(), table);
- List<Tuple> tuples1 = generateKeysForNode(710, 50, map,
server1.clusterService().topologyService().localMember(), table);
+ List<Tuple> tuples0 = generateKeysForNode(700, 50, map,
server0.cluster().localNode(), table);
+ List<Tuple> tuples1 = generateKeysForNode(710, 50, map,
server1.cluster().localNode(), table);
Map<Tuple, Tuple> batch = new HashMap<>();
@@ -931,6 +930,44 @@ public class ItThinClientTransactionsTest extends
ItAbstractThinClientTest {
tx.commit();
}
+ @Test
+ void testExplicitReadWriteTransaction() {
+ ClientTable table = (ClientTable) table();
+
+ KeyValueView<Tuple, Tuple> kvView = table.keyValueView();
+
+ // Load partition map to ensure all entries are directly mapped.
+ Map<Partition, ClusterNode> map =
table.partitionManager().primaryReplicasAsync().join();
+
+ IgniteImpl server0 = TestWrappers.unwrapIgniteImpl(server(0));
+ IgniteImpl server1 = TestWrappers.unwrapIgniteImpl(server(1));
+
+ List<Tuple> tuples0 = generateKeysForNode(600, 20, map,
server0.cluster().localNode(), table);
+ List<Tuple> tuples1 = generateKeysForNode(600, 20, map,
server1.cluster().localNode(), table);
+
+ Tuple k1 = tuples0.get(0);
+ Tuple v1 = val(tuples0.get(0).intValue(0) + "");
+
+ Tuple k2 = tuples1.get(1);
+ Tuple v2 = val(tuples1.get(1).intValue(0) + "");
+
+ Transaction tx0 = client().transactions().begin();
+ kvView.put(tx0, k1, v1);
+ kvView.put(tx0, k2, v2);
+ tx0.commit();
+
+ Map<Tuple, Tuple> crossPartitionBatch = new HashMap<>();
+ crossPartitionBatch.put(k1, v1);
+ crossPartitionBatch.put(k2, v2);
+
+ Map<Tuple, Tuple> res = kvView.getAll(null,
crossPartitionBatch.keySet());
+
+ assertEquals(crossPartitionBatch, res);
+
+ assertEquals(v1, kvView.get(null, k1));
+ assertEquals(v2, kvView.get(null, k2));
+ }
+
@Test
void testExplicitReadOnlyTransaction() {
ClientTable table = (ClientTable) table();
@@ -943,8 +980,8 @@ public class ItThinClientTransactionsTest extends
ItAbstractThinClientTest {
IgniteImpl server0 = TestWrappers.unwrapIgniteImpl(server(0));
IgniteImpl server1 = TestWrappers.unwrapIgniteImpl(server(1));
- List<Tuple> tuples0 = generateKeysForNode(600, 20, map,
server0.clusterService().topologyService().localMember(), table);
- List<Tuple> tuples1 = generateKeysForNode(600, 20, map,
server1.clusterService().topologyService().localMember(), table);
+ List<Tuple> tuples0 = generateKeysForNode(600, 20, map,
server0.cluster().localNode(), table);
+ List<Tuple> tuples1 = generateKeysForNode(600, 20, map,
server1.cluster().localNode(), table);
Tuple k1 = tuples0.get(0);
Tuple v1 = val(tuples0.get(0).intValue(0) + "");
diff --git
a/modules/runner/src/integrationTest/java/org/apache/ignite/internal/runner/app/client/ItThinClientTransactionsWithReplicasTest.java
b/modules/runner/src/integrationTest/java/org/apache/ignite/internal/runner/app/client/ItThinClientTransactionsWithReplicasTest.java
index fd9458925ba..f10eaf17548 100644
---
a/modules/runner/src/integrationTest/java/org/apache/ignite/internal/runner/app/client/ItThinClientTransactionsWithReplicasTest.java
+++
b/modules/runner/src/integrationTest/java/org/apache/ignite/internal/runner/app/client/ItThinClientTransactionsWithReplicasTest.java
@@ -51,9 +51,9 @@ public class ItThinClientTransactionsWithReplicasTest extends
ItAbstractThinClie
IgniteImpl server1 = TestWrappers.unwrapIgniteImpl(server(1));
IgniteImpl server2 = TestWrappers.unwrapIgniteImpl(server(2));
- List<Tuple> tuples0 = generateKeysForNode(100, 1, map,
server0.clusterService().topologyService().localMember(), table);
- List<Tuple> tuples1 = generateKeysForNode(100, 1, map,
server1.clusterService().topologyService().localMember(), table);
- List<Tuple> tuples2 = generateKeysForNode(100, 1, map,
server2.clusterService().topologyService().localMember(), table);
+ List<Tuple> tuples0 = generateKeysForNode(100, 1, map,
server0.cluster().localNode(), table);
+ List<Tuple> tuples1 = generateKeysForNode(100, 1, map,
server1.cluster().localNode(), table);
+ List<Tuple> tuples2 = generateKeysForNode(100, 1, map,
server2.cluster().localNode(), table);
if (tuples0.isEmpty() || tuples1.isEmpty() || tuples2.isEmpty()) {
return; // Skip the test if assignments are bad.
diff --git
a/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/InflightTransactionalOperationTracker.java
b/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/InflightTransactionalOperationTracker.java
index 7f1c9bcca7c..7b2a8a761d4 100644
---
a/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/InflightTransactionalOperationTracker.java
+++
b/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/InflightTransactionalOperationTracker.java
@@ -38,7 +38,9 @@ class InflightTransactionalOperationTracker implements
TransactionalOperationTra
@Override
public void registerOperationStart(InternalTransaction tx) {
if (shouldBeTracked(tx)) {
- if (!delegate.addInflight(tx.id(), tx.isReadOnly())) {
+ boolean result = tx.isReadOnly() ?
delegate.addScanInflight(tx.id()) : delegate.track(tx.id());
+
+ if (!result) {
throw new TransactionException(TX_ALREADY_FINISHED_ERR,
format("Transaction is already finished [tx={}]", tx));
}
}
@@ -47,11 +49,13 @@ class InflightTransactionalOperationTracker implements
TransactionalOperationTra
@Override
public void registerOperationFinish(InternalTransaction tx) {
if (shouldBeTracked(tx)) {
- delegate.removeInflight(tx.id());
+ if (tx.isReadOnly()) {
+ delegate.removeInflight(tx.id());
+ }
}
}
private static boolean shouldBeTracked(InternalTransaction tx) {
- return tx.isReadOnly() && !tx.implicit();
- }
+ return !tx.implicit();
+ }
}
diff --git
a/modules/sql-engine/src/test/java/org/apache/ignite/internal/sql/engine/QueryTransactionWrapperSelfTest.java
b/modules/sql-engine/src/test/java/org/apache/ignite/internal/sql/engine/QueryTransactionWrapperSelfTest.java
index 272cf855af1..219b1d2313f 100644
---
a/modules/sql-engine/src/test/java/org/apache/ignite/internal/sql/engine/QueryTransactionWrapperSelfTest.java
+++
b/modules/sql-engine/src/test/java/org/apache/ignite/internal/sql/engine/QueryTransactionWrapperSelfTest.java
@@ -70,7 +70,8 @@ public class QueryTransactionWrapperSelfTest extends
BaseIgniteAbstractTest {
public void testImplicitTransactionAttributes() {
prepareTransactionsMocks();
- when(transactionInflights.addInflight(any(),
anyBoolean())).thenReturn(true);
+ when(transactionInflights.track(any())).thenReturn(true);
+ when(transactionInflights.addScanInflight(any())).thenReturn(true);
QueryTransactionContext transactionHandler = new
QueryTransactionContextImpl(txManager, observableTimeTracker, null,
new
InflightTransactionalOperationTracker(transactionInflights));
@@ -143,6 +144,7 @@ public class QueryTransactionWrapperSelfTest extends
BaseIgniteAbstractTest {
return NoOpTransaction.readWrite("test", implicit);
});
+ when(transactionInflights.track(any())).thenReturn(true);
txCtx.handleControlStatement(txStartStmt);
assertThrowsSqlException(
@@ -163,23 +165,24 @@ public class QueryTransactionWrapperSelfTest extends
BaseIgniteAbstractTest {
QueryTransactionContext implicitDmlTxCtx = new
QueryTransactionContextImpl(txManager, observableTimeTracker, null,
new
InflightTransactionalOperationTracker(transactionInflights));
implicitDmlTxCtx.getOrStartSqlManaged(false, false);
- // Check that RW txns do not create tx inflights.
+ // Check that RW txns are tracked.
log.info("inflights={}", inflights);
- assertTrue(inflights.isEmpty());
+ assertEquals(1, inflights.size());
QueryTransactionContext implicitQueryTxCtx = new
QueryTransactionContextImpl(txManager, observableTimeTracker, null,
new
InflightTransactionalOperationTracker(transactionInflights));
QueryTransactionWrapper implicitQueryTxWrapper =
implicitQueryTxCtx.getOrStartSqlManaged(true, false);
assertTrue(inflights.contains(implicitQueryTxWrapper.unwrap().id()));
- implicitQueryTxWrapper.finalise();
- assertTrue(inflights.isEmpty());
+ implicitQueryTxWrapper.finalise().join();
+ assertEquals(1, inflights.size());
NoOpTransaction rwTx = NoOpTransaction.readWrite("test-rw", false);
QueryTransactionContext explicitRwTxCtx = new
QueryTransactionContextImpl(txManager, observableTimeTracker, rwTx,
new
InflightTransactionalOperationTracker(transactionInflights));
- explicitRwTxCtx.getOrStartSqlManaged(true, false);
- // Check that RW txns do not create tx inflights.
- assertTrue(inflights.isEmpty());
+ QueryTransactionWrapper explicitRwTxWrapper =
explicitRwTxCtx.getOrStartSqlManaged(true, false);
+ assertTrue(inflights.contains(explicitRwTxWrapper.unwrap().id()));
+ // Check that RW txns are tracked.
+ assertEquals(2, inflights.size());
NoOpTransaction roTx = NoOpTransaction.readOnly("test-ro", false);
QueryTransactionContext explicitRoTxCtx = new
QueryTransactionContextImpl(txManager, observableTimeTracker, roTx,
@@ -187,7 +190,7 @@ public class QueryTransactionWrapperSelfTest extends
BaseIgniteAbstractTest {
QueryTransactionWrapper explicitRoTxWrapper =
explicitRoTxCtx.getOrStartSqlManaged(true, false);
assertTrue(inflights.contains(explicitRoTxWrapper.unwrap().id()));
explicitRoTxWrapper.finalise();
- assertTrue(inflights.isEmpty());
+ assertEquals(2, inflights.size());
}
@Test
@@ -207,25 +210,25 @@ public class QueryTransactionWrapperSelfTest extends
BaseIgniteAbstractTest {
when(sqlStartRwTx.getMode()).thenAnswer(inv ->
IgniteSqlStartTransactionMode.READ_WRITE);
scriptRwTxCtx.handleControlStatement(sqlStartRwTx);
- assertTrue(inflights.isEmpty());
+ assertFalse(inflights.isEmpty());
ScriptTransactionContext scriptRoTxCtx = new
ScriptTransactionContext(txCtx, operationTracker);
IgniteSqlStartTransaction sqlStartRoTx =
mock(IgniteSqlStartTransaction.class);
when(sqlStartRoTx.getMode()).thenAnswer(inv ->
IgniteSqlStartTransactionMode.READ_ONLY);
scriptRoTxCtx.handleControlStatement(sqlStartRoTx);
- assertEquals(1, inflights.size());
+ assertEquals(2, inflights.size());
QueryTransactionWrapper wrapper =
scriptRoTxCtx.getOrStartSqlManaged(true, false);
- assertEquals(1, inflights.size());
+ assertEquals(2, inflights.size());
// ScriptTransactionWrapperImpl.commitImplicit is noop.
wrapper.finalise();
- assertEquals(1, inflights.size());
+ assertEquals(2, inflights.size());
IgniteSqlCommitTransaction sqlCommitTx =
mock(IgniteSqlCommitTransaction.class);
scriptRoTxCtx.handleControlStatement(sqlCommitTx);
- assertTrue(inflights.isEmpty());
+ assertEquals(1, inflights.size());
}
private void prepareTransactionsMocks() {
@@ -239,8 +242,10 @@ public class QueryTransactionWrapperSelfTest extends
BaseIgniteAbstractTest {
}
private void prepareTxInflightsMocks(Set<UUID> inflights) {
- when(transactionInflights.addInflight(any(),
anyBoolean())).thenAnswer(inv -> inflights.add(inv.getArgument(0)));
+ when(transactionInflights.addScanInflight(any())).thenAnswer(inv ->
inflights.add(inv.getArgument(0)));
doAnswer(inv ->
inflights.remove(inv.getArgument(0))).when(transactionInflights).removeInflight(any());
+
+ when(transactionInflights.track(any())).thenAnswer(inv ->
inflights.add(inv.getArgument(0)));
}
}
diff --git
a/modules/table/src/integrationTest/java/org/apache/ignite/distributed/ItAbstractInternalTableScanTest.java
b/modules/table/src/integrationTest/java/org/apache/ignite/distributed/ItAbstractInternalTableScanTest.java
index 9c57e845874..af51cae7bde 100644
---
a/modules/table/src/integrationTest/java/org/apache/ignite/distributed/ItAbstractInternalTableScanTest.java
+++
b/modules/table/src/integrationTest/java/org/apache/ignite/distributed/ItAbstractInternalTableScanTest.java
@@ -21,6 +21,8 @@ import static
org.apache.ignite.internal.util.ExceptionUtils.unwrapCause;
import static org.hamcrest.MatcherAssert.assertThat;
import static org.hamcrest.Matchers.is;
import static org.junit.jupiter.api.Assertions.assertEquals;
+import static org.junit.jupiter.api.Assertions.assertNotNull;
+import static org.junit.jupiter.api.Assertions.assertNull;
import static org.junit.jupiter.api.Assertions.assertThrows;
import static org.junit.jupiter.api.Assertions.assertTrue;
import static org.junit.jupiter.api.Assertions.fail;
@@ -69,7 +71,6 @@ import org.apache.ignite.internal.table.InternalTable;
import org.apache.ignite.internal.table.impl.DummyInternalTableImpl;
import org.apache.ignite.internal.testframework.IgniteAbstractTest;
import org.apache.ignite.internal.tx.InternalTransaction;
-import org.apache.ignite.internal.tx.TxState;
import org.apache.ignite.internal.tx.configuration.TransactionConfiguration;
import org.apache.ignite.internal.type.NativeTypes;
import org.jetbrains.annotations.Nullable;
@@ -277,7 +278,7 @@ public abstract class ItAbstractInternalTableScanTest
extends IgniteAbstractTest
assertEquals(NoSuchElementException.class,
unwrapCause(gotException.get()).getClass());
- validateTxAbortedState(tx);
+ validateTxFinished(tx);
}
/**
@@ -322,13 +323,12 @@ public abstract class ItAbstractInternalTableScanTest
extends IgniteAbstractTest
assertEquals(StorageException.class,
unwrapCause(gotException.get()).getClass());
- validateTxAbortedState(tx);
+ validateTxFinished(tx);
}
- protected void validateTxAbortedState(InternalTransaction tx) {
- if (tx != null) {
- assertEquals(TxState.ABORTED, tx.state());
- }
+ protected void validateTxFinished(InternalTransaction tx) {
+ assertNotNull(tx);
+ assertNull(tx.state());
}
/**
diff --git
a/modules/table/src/integrationTest/java/org/apache/ignite/distributed/ItInternalTableReadOnlyScanTest.java
b/modules/table/src/integrationTest/java/org/apache/ignite/distributed/ItInternalTableReadOnlyScanTest.java
index 68b864af23a..796a5166c06 100644
---
a/modules/table/src/integrationTest/java/org/apache/ignite/distributed/ItInternalTableReadOnlyScanTest.java
+++
b/modules/table/src/integrationTest/java/org/apache/ignite/distributed/ItInternalTableReadOnlyScanTest.java
@@ -57,7 +57,7 @@ public class ItInternalTableReadOnlyScanTest extends
ItAbstractInternalTableScan
}
@Override
- protected void validateTxAbortedState(InternalTransaction tx) {
+ protected void validateTxFinished(InternalTransaction tx) {
// noop since we do not store state for readonly transactions.
}
diff --git
a/modules/table/src/integrationTest/java/org/apache/ignite/distributed/ItTxStateLocalMapTest.java
b/modules/table/src/integrationTest/java/org/apache/ignite/distributed/ItTxStateLocalMapTest.java
index 6f4655090f0..51a070c1251 100644
---
a/modules/table/src/integrationTest/java/org/apache/ignite/distributed/ItTxStateLocalMapTest.java
+++
b/modules/table/src/integrationTest/java/org/apache/ignite/distributed/ItTxStateLocalMapTest.java
@@ -124,22 +124,22 @@ public class ItTxStateLocalMapTest extends
IgniteAbstractTest {
@ParameterizedTest
@ValueSource(booleans = { true, false })
public void testUpsert(boolean commit) {
- testTransaction(tx -> table.recordView().upsert(tx, makeValue(1, 1)),
true, commit);
+ testTransaction(tx -> table.recordView().upsert(tx, makeValue(1, 1)),
true, commit, false);
}
@ParameterizedTest
@ValueSource(booleans = { true, false })
public void testGet(boolean commit) {
- testTransaction(tx -> table.recordView().get(tx, makeKey(1)), false,
commit);
+ testTransaction(tx -> table.recordView().get(tx, makeKey(1)), false,
commit, true);
}
@ParameterizedTest
@ValueSource(booleans = { true, false })
public void testUpdateAll(boolean commit) {
- testTransaction(tx -> table.recordView().upsertAll(tx,
List.of(makeValue(1, 1), makeValue(2, 2))), true, commit);
+ testTransaction(tx -> table.recordView().upsertAll(tx,
List.of(makeValue(1, 1), makeValue(2, 2))), true, commit, false);
}
- private void testTransaction(Consumer<Transaction> touchOp, boolean
checkAfterTouch, boolean commit) {
+ private void testTransaction(Consumer<Transaction> touchOp, boolean
checkAfterTouch, boolean commit, boolean read) {
InternalClusterNode coord =
testCluster.cluster.get(0).topologyService().localMember();
UUID coordinatorId = coord.id();
@@ -160,17 +160,21 @@ public class ItTxStateLocalMapTest extends
IgniteAbstractTest {
tx.rollback();
}
- checkLocalTxStateOnNodes(
- tx.id(),
- new TxStateMeta(
- commit ? COMMITTED : ABORTED,
- coordinatorId,
- tx.commitPartition(),
- commit ?
testCluster.clockServices.get(coord.name()).now() : null,
- null,
- null
- )
- );
+ if (read) {
+ checkLocalTxStateOnNodes(tx.id(), null);
+ } else {
+ checkLocalTxStateOnNodes(
+ tx.id(),
+ new TxStateMeta(
+ commit ? COMMITTED : ABORTED,
+ coordinatorId,
+ tx.commitPartition(),
+ commit ?
testCluster.clockServices.get(coord.name()).now() : null,
+ null,
+ null
+ )
+ );
+ }
}
private void checkLocalTxStateOnNodes(UUID txId, TxStateMeta expected) {
diff --git
a/modules/table/src/integrationTest/java/org/apache/ignite/internal/table/ItColocationTest.java
b/modules/table/src/integrationTest/java/org/apache/ignite/internal/table/ItColocationTest.java
index 6cea81e1e53..8d09a82ce9d 100644
---
a/modules/table/src/integrationTest/java/org/apache/ignite/internal/table/ItColocationTest.java
+++
b/modules/table/src/integrationTest/java/org/apache/ignite/internal/table/ItColocationTest.java
@@ -233,6 +233,7 @@ public class ItColocationTest extends
BaseIgniteAbstractTest {
ReplicationGroupId commitPartition,
boolean commitIntent,
boolean timeoutExceeded,
+ boolean recovery,
Map<ReplicationGroupId, PendingTxPartitionEnlistment>
enlistedGroups,
UUID txId
) {
diff --git
a/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/storage/InternalTableImpl.java
b/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/storage/InternalTableImpl.java
index 8f418d59651..bd2e079ec7d 100644
---
a/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/storage/InternalTableImpl.java
+++
b/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/storage/InternalTableImpl.java
@@ -679,7 +679,7 @@ public class InternalTableImpl implements InternalTable {
if (req.isWrite()) {
// Track only write requests from explicit transactions.
- if (!tx.remote() && !transactionInflights.addInflight(tx.id(),
false)) {
+ if (!tx.remote() &&
!transactionInflights.addInflight(tx.id())) {
int code = TX_ALREADY_FINISHED_ERR;
if (tx.isRolledBackWithTimeoutExceeded()) {
code = TX_ALREADY_FINISHED_WITH_TIMEOUT_ERR;
@@ -2136,7 +2136,7 @@ public class InternalTableImpl implements InternalTable {
@Override
public void onRequestBegin() {
// Track read only requests which are able to create cursors.
- if (!transactionInflights.addInflight(txId, true)) {
+ if (!transactionInflights.addScanInflight(txId)) {
throw new TransactionException(TX_ALREADY_FINISHED_ERR, format(
"Transaction is already finished () [txId={},
readOnly=true].",
txId
diff --git
a/modules/table/src/test/java/org/apache/ignite/internal/table/distributed/replication/PartitionReplicaListenerTest.java
b/modules/table/src/test/java/org/apache/ignite/internal/table/distributed/replication/PartitionReplicaListenerTest.java
index 442b02a220c..fe0968f689d 100644
---
a/modules/table/src/test/java/org/apache/ignite/internal/table/distributed/replication/PartitionReplicaListenerTest.java
+++
b/modules/table/src/test/java/org/apache/ignite/internal/table/distributed/replication/PartitionReplicaListenerTest.java
@@ -763,7 +763,7 @@ public class PartitionReplicaListenerTest extends
IgniteAbstractTest {
// TODO https://issues.apache.org/jira/browse/IGNITE-22522 Remove this
test when zone colocation will be the only implementation.
public void testTxStateReplicaRequestEmptyState() throws Exception {
doAnswer(invocation -> {
- UUID txId = invocation.getArgument(5);
+ UUID txId = invocation.getArgument(6);
txManager.updateTxMeta(txId, old -> new TxStateMeta(
ABORTED,
@@ -775,7 +775,7 @@ public class PartitionReplicaListenerTest extends
IgniteAbstractTest {
));
return nullCompletedFuture();
- }).when(txManager).finish(any(), any(), anyBoolean(), anyBoolean(),
any(), any());
+ }).when(txManager).finish(any(), any(), anyBoolean(), anyBoolean(),
anyBoolean(), any(), any());
CompletableFuture<ReplicaResult> fut =
partitionReplicaListener.invoke(TX_MESSAGES_FACTORY.txStateCommitPartitionRequest()
.groupId(tablePartitionIdMessage(grpId))
@@ -2335,7 +2335,7 @@ public class PartitionReplicaListenerTest extends
IgniteAbstractTest {
doAnswer(invocation ->
nullCompletedFuture()).when(txManager).executeWriteIntentSwitchAsync(any(Runnable.class));
doAnswer(invocation -> nullCompletedFuture())
- .when(txManager).finish(any(), any(), anyBoolean(),
anyBoolean(), any(), any());
+ .when(txManager).finish(any(), any(), anyBoolean(),
anyBoolean(), anyBoolean(), any(), any());
doAnswer(invocation -> nullCompletedFuture())
.when(txManager).cleanup(any(), anyString(), any());
}
diff --git
a/modules/table/src/test/java/org/apache/ignite/internal/table/distributed/replication/ZonePartitionReplicaListenerTest.java
b/modules/table/src/test/java/org/apache/ignite/internal/table/distributed/replication/ZonePartitionReplicaListenerTest.java
index bacda373b05..39715e0b950 100644
---
a/modules/table/src/test/java/org/apache/ignite/internal/table/distributed/replication/ZonePartitionReplicaListenerTest.java
+++
b/modules/table/src/test/java/org/apache/ignite/internal/table/distributed/replication/ZonePartitionReplicaListenerTest.java
@@ -961,7 +961,7 @@ public class ZonePartitionReplicaListenerTest extends
IgniteAbstractTest {
doAnswer(invocation ->
nullCompletedFuture()).when(txManager).executeWriteIntentSwitchAsync(any(Runnable.class));
doAnswer(invocation -> nullCompletedFuture())
- .when(txManager).finish(any(), any(), anyBoolean(),
anyBoolean(), any(), any());
+ .when(txManager).finish(any(), any(), anyBoolean(),
anyBoolean(), anyBoolean(), any(), any());
doAnswer(invocation -> nullCompletedFuture())
.when(txManager).cleanup(any(), anyString(), any());
}
@@ -1340,7 +1340,7 @@ public class ZonePartitionReplicaListenerTest extends
IgniteAbstractTest {
@Test
public void testTxStateReplicaRequestEmptyState() throws Exception {
doAnswer(invocation -> {
- UUID txId = invocation.getArgument(5);
+ UUID txId = invocation.getArgument(6);
txManager.updateTxMeta(txId, old -> new TxStateMeta(
ABORTED,
@@ -1352,7 +1352,7 @@ public class ZonePartitionReplicaListenerTest extends
IgniteAbstractTest {
));
return nullCompletedFuture();
- }).when(txManager).finish(any(), any(), anyBoolean(), anyBoolean(),
any(), any());
+ }).when(txManager).finish(any(), any(), anyBoolean(), anyBoolean(),
anyBoolean(), any(), any());
CompletableFuture<ReplicaResult> fut =
zonePartitionReplicaListener.invoke(TX_MESSAGES_FACTORY.txStateCommitPartitionRequest()
.groupId(tablePartitionIdMessage(grpId))
diff --git
a/modules/table/src/test/java/org/apache/ignite/internal/table/distributed/storage/InternalTableImplTest.java
b/modules/table/src/test/java/org/apache/ignite/internal/table/distributed/storage/InternalTableImplTest.java
index 22974aaf3bd..180be47cce7 100644
---
a/modules/table/src/test/java/org/apache/ignite/internal/table/distributed/storage/InternalTableImplTest.java
+++
b/modules/table/src/test/java/org/apache/ignite/internal/table/distributed/storage/InternalTableImplTest.java
@@ -149,7 +149,8 @@ public class InternalTableImplTest extends
BaseIgniteAbstractTest {
);
});
- lenient().when(txManager.finish(any(), any(), anyBoolean(),
anyBoolean(), any(), any())).thenReturn(nullCompletedFuture());
+ lenient().when(txManager.finish(any(), any(), anyBoolean(),
anyBoolean(), anyBoolean(), any(), any()))
+ .thenReturn(nullCompletedFuture());
lenient().when(replicaService.invoke(anyString(),
any())).then(invocation -> {
ReplicaRequest request = invocation.getArgument(1);
@@ -392,7 +393,7 @@ public class InternalTableImplTest extends
BaseIgniteAbstractTest {
private Map<ReplicationGroupId, PendingTxPartitionEnlistment>
extractEnlistmentsFromTxFinish() {
ArgumentCaptor<Map<ReplicationGroupId, PendingTxPartitionEnlistment>>
enlistmentsCaptor = ArgumentCaptor.captor();
- verify(txManager).finish(any(), any(), anyBoolean(), anyBoolean(),
enlistmentsCaptor.capture(), any());
+ verify(txManager).finish(any(), any(), anyBoolean(), anyBoolean(),
anyBoolean(), enlistmentsCaptor.capture(), any());
return enlistmentsCaptor.getValue();
}
diff --git
a/modules/transactions/src/integrationTest/java/org/apache/ignite/internal/tx/ItTxCleanupTest.java
b/modules/transactions/src/integrationTest/java/org/apache/ignite/internal/tx/ItTxCleanupTest.java
index b758fdbd0dd..d16b6fed98b 100644
---
a/modules/transactions/src/integrationTest/java/org/apache/ignite/internal/tx/ItTxCleanupTest.java
+++
b/modules/transactions/src/integrationTest/java/org/apache/ignite/internal/tx/ItTxCleanupTest.java
@@ -21,6 +21,7 @@ import static java.util.concurrent.TimeUnit.SECONDS;
import static
org.apache.ignite.internal.testframework.IgniteTestUtils.waitForCondition;
import static org.hamcrest.MatcherAssert.assertThat;
import static org.hamcrest.Matchers.is;
+import static org.junit.jupiter.api.Assertions.assertFalse;
import static org.junit.jupiter.api.Assertions.assertTrue;
import java.util.concurrent.atomic.AtomicInteger;
@@ -75,6 +76,12 @@ class ItTxCleanupTest extends ClusterPerTestIntegrationTest {
}
});
+ if (readsOnly) {
+ // We shouldn't see write intent switching from RW-R a transaction.
+ assertFalse(waitForCondition(() ->
writeIntentSwitchRequestCount.get() > 0, SECONDS.toMillis(1)));
+ return;
+ }
+
// We should see one WI switch...
assertTrue(waitForCondition(() -> writeIntentSwitchRequestCount.get()
> 0, SECONDS.toMillis(10)));
diff --git
a/modules/transactions/src/integrationTest/java/org/apache/ignite/tx/distributed/ItTransactionRecoveryTest.java
b/modules/transactions/src/integrationTest/java/org/apache/ignite/tx/distributed/ItTransactionRecoveryTest.java
index 4a4ce23cd2b..7bfeb9e7848 100644
---
a/modules/transactions/src/integrationTest/java/org/apache/ignite/tx/distributed/ItTransactionRecoveryTest.java
+++
b/modules/transactions/src/integrationTest/java/org/apache/ignite/tx/distributed/ItTransactionRecoveryTest.java
@@ -760,6 +760,7 @@ public class ItTransactionRecoveryTest extends
ClusterPerTestIntegrationTest {
commitPartition,
false,
false,
+ true,
Map.of(commitPartition, new
PendingTxPartitionEnlistment(txCrdNode2.node().name(), 0L)),
rwTx1Id
);
diff --git
a/modules/transactions/src/main/java/org/apache/ignite/internal/tx/TxManager.java
b/modules/transactions/src/main/java/org/apache/ignite/internal/tx/TxManager.java
index 3c6b77a79ec..23f38e89748 100644
---
a/modules/transactions/src/main/java/org/apache/ignite/internal/tx/TxManager.java
+++
b/modules/transactions/src/main/java/org/apache/ignite/internal/tx/TxManager.java
@@ -176,6 +176,7 @@ public interface TxManager extends IgniteComponent {
* @param commitPartition Partition to store a transaction state. {@code
null} if nothing was enlisted into the transaction.
* @param commitIntent {@code true} if a commit requested.
* @param timeoutExceeded {@code true} if a timeout exceeded.
+ * @param recovery {@code true} if finished by recovery.
* @param enlistedGroups Map of enlisted partitions.
* @param txId Transaction id.
*/
@@ -184,6 +185,7 @@ public interface TxManager extends IgniteComponent {
@Nullable ReplicationGroupId commitPartition,
boolean commitIntent,
boolean timeoutExceeded,
+ boolean recovery,
Map<ReplicationGroupId, PendingTxPartitionEnlistment>
enlistedGroups,
UUID txId
);
@@ -193,7 +195,7 @@ public interface TxManager extends IgniteComponent {
*
* <p>The nodes to send the request to are taken from the mapping
`partition id -> partition primary`.
*
- * @param commitPartitionId Commit partition id.
+ * @param commitPartitionId Commit partition id. {@code Null} for unlock
only path.
* @param enlistedPartitions Map of enlisted partitions.
* @param commit {@code true} if a commit requested.
* @param commitTimestamp Commit timestamp ({@code null} if it's an abort).
@@ -201,7 +203,7 @@ public interface TxManager extends IgniteComponent {
* @return Completable future of Void.
*/
CompletableFuture<Void> cleanup(
- ReplicationGroupId commitPartitionId,
+ @Nullable ReplicationGroupId commitPartitionId,
Map<ReplicationGroupId, PartitionEnlistment> enlistedPartitions,
boolean commit,
@Nullable HybridTimestamp commitTimestamp,
@@ -213,7 +215,7 @@ public interface TxManager extends IgniteComponent {
*
* <p>The nodes to sends the request to are calculated by the placement
driver.
*
- * @param commitPartitionId Commit partition id.
+ * @param commitPartitionId Commit partition id. {@code Null} for unlock
only path.
* @param enlistedPartitions Enlisted partitions.
* @param commit {@code true} if a commit requested.
* @param commitTimestamp Commit timestamp ({@code null} if it's an abort).
@@ -221,7 +223,7 @@ public interface TxManager extends IgniteComponent {
* @return Completable future of Void.
*/
CompletableFuture<Void> cleanup(
- ReplicationGroupId commitPartitionId,
+ @Nullable ReplicationGroupId commitPartitionId,
Collection<EnlistedPartitionGroup> enlistedPartitions,
boolean commit,
@Nullable HybridTimestamp commitTimestamp,
@@ -229,7 +231,7 @@ public interface TxManager extends IgniteComponent {
);
/**
- * Sends cleanup request to the nodes than initiated recovery.
+ * Sends cleanup request to a node that had initiated the recovery.
*
* @param commitPartitionId Commit partition id.
* @param node Target node.
diff --git
a/modules/transactions/src/main/java/org/apache/ignite/internal/tx/impl/ReadOnlyTransactionImpl.java
b/modules/transactions/src/main/java/org/apache/ignite/internal/tx/impl/ReadOnlyTransactionImpl.java
index 44e2c014a78..5c85ffefbe8 100644
---
a/modules/transactions/src/main/java/org/apache/ignite/internal/tx/impl/ReadOnlyTransactionImpl.java
+++
b/modules/transactions/src/main/java/org/apache/ignite/internal/tx/impl/ReadOnlyTransactionImpl.java
@@ -152,8 +152,8 @@ public class ReadOnlyTransactionImpl extends
IgniteAbstractTransactionImpl {
((TxManagerImpl) txManager).onCompleteReadOnlyTransaction(
commitIntent,
- new TxIdAndTimestamp(readTimestamp, id()),
- timeoutExceeded);
+ new TxIdAndTimestamp(readTimestamp, id())
+ );
this.timeoutExceeded = timeoutExceeded;
diff --git
a/modules/transactions/src/main/java/org/apache/ignite/internal/tx/impl/ReadWriteTransactionImpl.java
b/modules/transactions/src/main/java/org/apache/ignite/internal/tx/impl/ReadWriteTransactionImpl.java
index 284da3f73f2..77e071bce49 100644
---
a/modules/transactions/src/main/java/org/apache/ignite/internal/tx/impl/ReadWriteTransactionImpl.java
+++
b/modules/transactions/src/main/java/org/apache/ignite/internal/tx/impl/ReadWriteTransactionImpl.java
@@ -254,6 +254,7 @@ public class ReadWriteTransactionImpl extends
IgniteAbstractTransactionImpl {
commitPart,
commit,
timeoutExceeded,
+ false,
enlisted,
id()
);
diff --git
a/modules/transactions/src/main/java/org/apache/ignite/internal/tx/impl/TransactionInflights.java
b/modules/transactions/src/main/java/org/apache/ignite/internal/tx/impl/TransactionInflights.java
index 4dc2230724f..5e8f4e4511f 100644
---
a/modules/transactions/src/main/java/org/apache/ignite/internal/tx/impl/TransactionInflights.java
+++
b/modules/transactions/src/main/java/org/apache/ignite/internal/tx/impl/TransactionInflights.java
@@ -42,6 +42,7 @@ import
org.apache.ignite.internal.tx.PendingTxPartitionEnlistment;
import org.apache.ignite.internal.tx.TransactionResult;
import org.apache.ignite.internal.tx.message.FinishedTransactionsBatchMessage;
import org.jetbrains.annotations.Nullable;
+import org.jetbrains.annotations.TestOnly;
/**
* Contains counters for in-flight requests of the transactions. Read-write
transactions can't finish when some requests are in-flight.
@@ -65,18 +66,17 @@ public class TransactionInflights {
}
/**
- * Registers the inflight update for a transaction.
+ * Register the update inflight for RW transaction.
*
* @param txId The transaction id.
- * @param readOnly Whether the transaction is read-only.
* @return {@code True} if the inflight was registered. The update must be
failed on false.
*/
- public boolean addInflight(UUID txId, boolean readOnly) {
+ public boolean addInflight(UUID txId) {
boolean[] res = {true};
txCtxMap.compute(txId, (uuid, ctx) -> {
if (ctx == null) {
- ctx = readOnly ? new ReadOnlyTxContext() : new
ReadWriteTxContext(placementDriver, clockService);
+ ctx = new ReadWriteTxContext(placementDriver, clockService);
}
res[0] = ctx.addInflight();
@@ -87,6 +87,74 @@ public class TransactionInflights {
return res[0];
}
+ /**
+ * Register the scan inflight for RO transaction.
+ *
+ * @param txId The transaction id.
+ * @return {@code True} if the inflight was registered. The scan must be
failed on false.
+ */
+ public boolean addScanInflight(UUID txId) {
+ boolean[] res = {true};
+
+ txCtxMap.compute(txId, (uuid, ctx) -> {
+ if (ctx == null) {
+ ctx = new ReadOnlyTxContext();
+ }
+
+ res[0] = ctx.addInflight();
+
+ return ctx;
+ });
+
+ return res[0];
+ }
+
+ /**
+ * Track the given RW transaction until finish.
+ * Currently RW tracking is used to enforce cleanup path for SQL RW
transactions, which doesn't use RW inflights tracking yet.
+ *
+ * @param txId The transaction id.
+ * @return {@code True} if the was registered and is in active state.
+ */
+ public boolean track(UUID txId) {
+ boolean[] res = {true};
+
+ txCtxMap.compute(txId, (uuid, ctx) -> {
+ if (ctx == null) {
+ ctx = new ReadWriteTxContext(placementDriver, clockService);
+ }
+
+ res[0] = !ctx.isTxFinishing();
+
+ return ctx;
+ });
+
+ return res[0];
+ }
+
+ /**
+ * Track the given RO transaction until finish.
+ * Currently RO tracking is used to prevent unclosed cursors.
+ *
+ * @param txId The transaction id.
+ * @return {@code True} if the was registered and is in active state.
+ */
+ public boolean trackReadOnly(UUID txId) {
+ boolean[] res = {true};
+
+ txCtxMap.compute(txId, (uuid, ctx) -> {
+ if (ctx == null) {
+ ctx = new ReadOnlyTxContext();
+ }
+
+ res[0] = !ctx.isTxFinishing();
+
+ return ctx;
+ });
+
+ return res[0];
+ }
+
/**
* Unregisters the inflight for a transaction.
*
@@ -106,6 +174,22 @@ public class TransactionInflights {
}
}
+ /**
+ * Get active inflights.
+ *
+ * @return {@code True} if has some inflights in progress.
+ */
+ @TestOnly
+ public boolean hasActiveInflights() {
+ for (TxContext value : txCtxMap.values()) {
+ if (!value.isTxFinishing()) {
+ return true;
+ }
+ }
+
+ return false;
+ }
+
Collection<UUID> finishedReadOnlyTransactions() {
return txCtxMap.entrySet().stream()
.filter(e -> e.getValue() instanceof ReadOnlyTxContext &&
e.getValue().isReadyToFinish())
@@ -137,13 +221,15 @@ public class TransactionInflights {
}
}
- void markReadOnlyTxFinished(UUID txId, boolean timeoutExceeded) {
+ void markReadOnlyTxFinished(UUID txId) {
txCtxMap.compute(txId, (k, ctx) -> {
if (ctx == null) {
- ctx = new ReadOnlyTxContext(timeoutExceeded);
+ ctx = new ReadOnlyTxContext();
+ } else {
+ assert ctx instanceof ReadOnlyTxContext;
}
- ctx.finishTx(null, timeoutExceeded);
+ ctx.finishTx(null);
return ctx;
});
@@ -152,12 +238,12 @@ public class TransactionInflights {
ReadWriteTxContext lockTxForNewUpdates(UUID txId, Map<ReplicationGroupId,
PendingTxPartitionEnlistment> enlistedGroups) {
return (ReadWriteTxContext) txCtxMap.compute(txId, (uuid, tuple0) -> {
if (tuple0 == null) {
- tuple0 = new ReadWriteTxContext(placementDriver, clockService,
false); // No writes enlisted.
+ tuple0 = new ReadWriteTxContext(placementDriver, clockService,
true); // No writes enlisted, can go with unlock only.
}
assert !tuple0.isTxFinishing() : "Transaction is already finished
[id=" + uuid + "].";
- tuple0.finishTx(enlistedGroups, false);
+ tuple0.finishTx(enlistedGroups);
return tuple0;
});
@@ -185,13 +271,11 @@ public class TransactionInflights {
abstract void onInflightsRemoved();
- abstract void finishTx(@Nullable Map<ReplicationGroupId,
PendingTxPartitionEnlistment> enlistedGroups, boolean timeoutExceeded);
+ abstract void finishTx(@Nullable Map<ReplicationGroupId,
PendingTxPartitionEnlistment> enlistedGroups);
abstract boolean isTxFinishing();
abstract boolean isReadyToFinish();
-
- abstract boolean isTimeoutExceeded();
}
/**
@@ -203,23 +287,18 @@ public class TransactionInflights {
*/
private static class ReadOnlyTxContext extends TxContext {
private volatile boolean markedFinished;
- private volatile boolean timeoutExceeded;
ReadOnlyTxContext() {
// No-op.
}
- ReadOnlyTxContext(boolean timeoutExceeded) {
- this.timeoutExceeded = timeoutExceeded;
- }
-
@Override
public void onInflightsRemoved() {
// No-op.
}
@Override
- public void finishTx(@Nullable Map<ReplicationGroupId,
PendingTxPartitionEnlistment> enlistedGroups, boolean timeoutExceeded) {
+ public void finishTx(@Nullable Map<ReplicationGroupId,
PendingTxPartitionEnlistment> enlistedGroups) {
markedFinished = true;
}
@@ -233,11 +312,6 @@ public class TransactionInflights {
return markedFinished && inflights == 0;
}
- @Override
- boolean isTimeoutExceeded() {
- return timeoutExceeded;
- }
-
@Override
public String toString() {
return "ReadOnlyTxContext [inflights=" + inflights + ']';
@@ -247,19 +321,19 @@ public class TransactionInflights {
static class ReadWriteTxContext extends TxContext {
private final CompletableFuture<Void> waitRepFut = new
CompletableFuture<>();
private final PlacementDriver placementDriver;
+ private final boolean noWrites;
private volatile CompletableFuture<Void> finishInProgressFuture = null;
private volatile Map<ReplicationGroupId, PendingTxPartitionEnlistment>
enlistedGroups;
private final ClockService clockService;
- private volatile boolean timeoutExceeded;
private ReadWriteTxContext(PlacementDriver placementDriver,
ClockService clockService) {
this(placementDriver, clockService, false);
}
- private ReadWriteTxContext(PlacementDriver placementDriver,
ClockService clockService, boolean timeoutExceeded) {
+ private ReadWriteTxContext(PlacementDriver placementDriver,
ClockService clockService, boolean noWrites) {
this.placementDriver = placementDriver;
this.clockService = clockService;
- this.timeoutExceeded = timeoutExceeded;
+ this.noWrites = noWrites;
}
CompletableFuture<Void> performFinish(boolean commit,
Function<Boolean, CompletableFuture<Void>> finishAction) {
@@ -326,8 +400,7 @@ public class TransactionInflights {
});
}
- return allOfToList(futures)
- .thenCompose(unused -> waitNoInflights());
+ return allOfToList(futures).thenCompose(unused ->
waitNoInflights());
} else {
return nullCompletedFuture();
}
@@ -352,9 +425,8 @@ public class TransactionInflights {
}
@Override
- public void finishTx(Map<ReplicationGroupId,
PendingTxPartitionEnlistment> enlistedGroups, boolean timeoutExceeded) {
+ public void finishTx(Map<ReplicationGroupId,
PendingTxPartitionEnlistment> enlistedGroups) {
this.enlistedGroups = enlistedGroups;
- this.timeoutExceeded = timeoutExceeded;
finishInProgressFuture = new CompletableFuture<>();
}
@@ -368,15 +440,14 @@ public class TransactionInflights {
return waitRepFut.isDone();
}
- @Override
- boolean isTimeoutExceeded() {
- return timeoutExceeded;
+ boolean isNoWrites() {
+ return noWrites;
}
@Override
public String toString() {
return "ReadWriteTxContext [inflights=" + inflights + ",
waitRepFut=" + waitRepFut
- + ", finishFut=" + finishInProgressFuture + ']';
+ + ", noWrites=" + noWrites + ", finishFut=" +
finishInProgressFuture + ']';
}
}
}
diff --git
a/modules/transactions/src/main/java/org/apache/ignite/internal/tx/impl/TxCleanupRequestSender.java
b/modules/transactions/src/main/java/org/apache/ignite/internal/tx/impl/TxCleanupRequestSender.java
index e621115166d..52f059ccdaf 100644
---
a/modules/transactions/src/main/java/org/apache/ignite/internal/tx/impl/TxCleanupRequestSender.java
+++
b/modules/transactions/src/main/java/org/apache/ignite/internal/tx/impl/TxCleanupRequestSender.java
@@ -151,7 +151,7 @@ public class TxCleanupRequestSender {
/**
* Sends cleanup request to the primary nodes of each one of {@code
partitions}.
*
- * @param commitPartitionId Commit partition id.
+ * @param commitPartitionId Commit partition id. {@code Null} for unlock
only path.
* @param enlistedPartitions Map of enlisted partitions.
* @param commit {@code true} if a commit requested.
* @param commitTimestamp Commit timestamp ({@code null} if it's an abort).
@@ -159,17 +159,19 @@ public class TxCleanupRequestSender {
* @return Completable future of Void.
*/
public CompletableFuture<Void> cleanup(
- ReplicationGroupId commitPartitionId,
+ @Nullable ReplicationGroupId commitPartitionId,
Map<ReplicationGroupId, PartitionEnlistment> enlistedPartitions,
boolean commit,
@Nullable HybridTimestamp commitTimestamp,
UUID txId
) {
// Start tracking the partitions we want to learn the replication
confirmation from.
- writeIntentsReplicated.put(
- txId,
- new CleanupContext(commitPartitionId,
enlistedPartitions.keySet(), commit ? TxState.COMMITTED : TxState.ABORTED)
- );
+ if (commitPartitionId != null) {
+ writeIntentsReplicated.put(
+ txId,
+ new CleanupContext(commitPartitionId,
enlistedPartitions.keySet(), commit ? TxState.COMMITTED : TxState.ABORTED)
+ );
+ }
Map<String, List<EnlistedPartitionGroup>> partitionsByPrimaryName =
new HashMap<>();
enlistedPartitions.forEach((partitionId, partition) -> {
@@ -186,7 +188,7 @@ public class TxCleanupRequestSender {
/**
* Gets primary nodes for each of the provided {@code partitions} and
sends cleanup request to each one.
*
- * @param commitPartitionId Commit partition id.
+ * @param commitPartitionId Commit partition id. {@code Null} for unlock
only path.
* @param partitions Collection of enlisted partitions.
* @param commit {@code true} if a commit requested.
* @param commitTimestamp Commit timestamp ({@code null} if it's an abort).
@@ -194,7 +196,7 @@ public class TxCleanupRequestSender {
* @return Completable future of Void.
*/
public CompletableFuture<Void> cleanup(
- ReplicationGroupId commitPartitionId,
+ @Nullable ReplicationGroupId commitPartitionId,
Collection<EnlistedPartitionGroup> partitions,
boolean commit,
@Nullable HybridTimestamp commitTimestamp,
@@ -203,11 +205,14 @@ public class TxCleanupRequestSender {
Map<ReplicationGroupId, EnlistedPartitionGroup> partitionIds =
partitions.stream()
.collect(toMap(EnlistedPartitionGroup::groupId, identity()));
- // Start tracking the partitions we want to learn the replication
confirmation from.
- writeIntentsReplicated.put(
- txId,
- new CleanupContext(commitPartitionId, new
HashSet<>(partitionIds.keySet()), commit ? TxState.COMMITTED : TxState.ABORTED)
- );
+ if (commitPartitionId != null) {
+ // Start tracking the partitions we want to learn the replication
confirmation from.
+ writeIntentsReplicated.put(
+ txId,
+ new CleanupContext(commitPartitionId, new
HashSet<>(partitionIds.keySet()),
+ commit ? TxState.COMMITTED : TxState.ABORTED)
+ );
+ }
return placementDriverHelper.findPrimaryReplicas(partitionIds.keySet())
.thenCompose(partitionData -> {
@@ -245,7 +250,7 @@ public class TxCleanupRequestSender {
}
private void cleanupPartitionsWithoutPrimary(
- ReplicationGroupId commitPartitionId,
+ @Nullable ReplicationGroupId commitPartitionId,
boolean commit,
@Nullable HybridTimestamp commitTimestamp,
UUID txId,
@@ -267,7 +272,7 @@ public class TxCleanupRequestSender {
}
private CompletableFuture<Void> cleanupPartitions(
- ReplicationGroupId commitPartitionId,
+ @Nullable ReplicationGroupId commitPartitionId,
Map<String, List<EnlistedPartitionGroup>> partitionsByNode,
boolean commit,
@Nullable HybridTimestamp commitTimestamp,
@@ -279,14 +284,15 @@ public class TxCleanupRequestSender {
String node = entry.getKey();
List<EnlistedPartitionGroup> nodePartitions = entry.getValue();
-
cleanupFutures.add(sendCleanupMessageWithRetries(commitPartitionId, commit,
commitTimestamp, txId, node, nodePartitions));
+
cleanupFutures.add(sendCleanupMessageWithRetries(commitPartitionId, commit,
commitTimestamp, txId, node,
+ commitPartitionId == null ? null : nodePartitions));
}
return allOf(cleanupFutures.toArray(new CompletableFuture<?>[0]));
}
private CompletableFuture<Void> sendCleanupMessageWithRetries(
- ReplicationGroupId commitPartitionId,
+ @Nullable ReplicationGroupId commitPartitionId,
boolean commit,
@Nullable HybridTimestamp commitTimestamp,
UUID txId,
@@ -305,7 +311,7 @@ public class TxCleanupRequestSender {
// or will run `switchWriteIntentsOnPartitions`
for partitions with no primary.
// At the end of the day all write intents will be
properly converted.
if (partitions == null) {
- // If we don't have any partition, which is
the recovery case,
+ // If we don't have any partition, which is
the recovery or unlock only case,
// just try again with the same node.
return
sendCleanupMessageWithRetries(commitPartitionId, commit, commitTimestamp, txId,
node, partitions);
}
diff --git
a/modules/transactions/src/main/java/org/apache/ignite/internal/tx/impl/TxManagerImpl.java
b/modules/transactions/src/main/java/org/apache/ignite/internal/tx/impl/TxManagerImpl.java
index 890885baed2..85253160aa1 100644
---
a/modules/transactions/src/main/java/org/apache/ignite/internal/tx/impl/TxManagerImpl.java
+++
b/modules/transactions/src/main/java/org/apache/ignite/internal/tx/impl/TxManagerImpl.java
@@ -489,6 +489,7 @@ public class TxManagerImpl implements TxManager,
NetworkMessageHandler, SystemVi
// no need to register them.
// TODO: https://issues.apache.org/jira/browse/IGNITE-24229 - schedule
expiration for multi-key implicit transactions?
if (!implicit) {
+ // TODO IGNITE-26531 Unregister is not called on finish.
transactionExpirationRegistry.register(transaction);
if (isStopping) {
@@ -624,6 +625,7 @@ public class TxManagerImpl implements TxManager,
NetworkMessageHandler, SystemVi
@Nullable ReplicationGroupId commitPartition,
boolean commitIntent,
boolean timeout,
+ boolean recovery,
Map<ReplicationGroupId, PendingTxPartitionEnlistment>
enlistedGroups,
UUID txId
) {
@@ -695,7 +697,8 @@ public class TxManagerImpl implements TxManager,
NetworkMessageHandler, SystemVi
commit,
enlistedGroups,
txId,
- finishingStateMeta.txFinishFuture()
+ finishingStateMeta.txFinishFuture(),
+ txContext.isNoWrites() && !recovery
)
).whenComplete((unused, throwable) -> {
if (localNodeId.equals(finishingStateMeta.txCoordinatorId())) {
@@ -731,7 +734,8 @@ public class TxManagerImpl implements TxManager,
NetworkMessageHandler, SystemVi
boolean commit,
Map<ReplicationGroupId, PendingTxPartitionEnlistment>
enlistedGroups,
UUID txId,
- CompletableFuture<TransactionMeta> txFinishFuture
+ CompletableFuture<TransactionMeta> txFinishFuture,
+ boolean unlock
) {
HybridTimestamp commitTimestamp = commitTimestamp(commit);
// In case of commit it's required to check whether current primaries
are still the same that were enlisted and whether
@@ -747,6 +751,27 @@ public class TxManagerImpl implements TxManager,
NetworkMessageHandler, SystemVi
Map<ReplicationGroupId, PartitionEnlistment>
groups = enlistedGroups.entrySet().stream()
.collect(toMap(Entry::getKey,
Entry::getValue));
+ if (unlock) {
+ return txCleanupRequestSender.cleanup(null,
groups, verifiedCommit, commitTimestamp, txId)
+ .thenAccept(ignored -> {
+ // Don't keep useless state.
+
txStateVolatileStorage.updateMeta(txId, old -> null);
+
+ TxStateMeta meta = new TxStateMeta(
+ verifiedCommit ? COMMITTED
: ABORTED,
+ localNodeId,
+ null,
+ commitTimestamp,
+ null,
+ null,
+ System.currentTimeMillis(),
+ null
+ );
+
+ txFinishFuture.complete(meta);
+ });
+ }
+
return durableFinish(
observableTimestampTracker,
commitPartition,
@@ -1095,13 +1120,12 @@ public class TxManagerImpl implements TxManager,
NetworkMessageHandler, SystemVi
@Override
public CompletableFuture<Void> cleanup(
- ReplicationGroupId commitPartitionId,
+ @Nullable ReplicationGroupId commitPartitionId,
Map<ReplicationGroupId, PartitionEnlistment> enlistedPartitions,
boolean commit,
@Nullable HybridTimestamp commitTimestamp,
UUID txId
) {
- assertReplicationGroupType(commitPartitionId);
for (ReplicationGroupId replicationGroupId :
enlistedPartitions.keySet()) {
assertReplicationGroupType(replicationGroupId);
}
@@ -1169,12 +1193,12 @@ public class TxManagerImpl implements TxManager,
NetworkMessageHandler, SystemVi
return runAsync(runnable, writeIntentSwitchPool);
}
- void onCompleteReadOnlyTransaction(boolean commitIntent, TxIdAndTimestamp
txIdAndTimestamp, boolean timeoutExceeded) {
+ void onCompleteReadOnlyTransaction(boolean commitIntent, TxIdAndTimestamp
txIdAndTimestamp) {
UUID txId = txIdAndTimestamp.getTxId();
txMetrics.onReadOnlyTransactionFinished(txId, commitIntent);
- transactionInflights.markReadOnlyTxFinished(txId, timeoutExceeded);
+ transactionInflights.markReadOnlyTxFinished(txId);
}
@Override
diff --git
a/modules/transactions/src/test/java/org/apache/ignite/internal/tx/TxManagerTest.java
b/modules/transactions/src/test/java/org/apache/ignite/internal/tx/TxManagerTest.java
index b6f6bd3a5d4..f7853879a69 100644
---
a/modules/transactions/src/test/java/org/apache/ignite/internal/tx/TxManagerTest.java
+++
b/modules/transactions/src/test/java/org/apache/ignite/internal/tx/TxManagerTest.java
@@ -164,6 +164,8 @@ public class TxManagerTest extends IgniteAbstractTest {
private final TestLowWatermark lowWatermark = spy(new TestLowWatermark());
+ private TransactionInflights transactionInflights;
+
@BeforeEach
public void setup() {
clusterService = mock(ClusterService.class, RETURNS_DEEP_STUBS);
@@ -178,7 +180,7 @@ public class TxManagerTest extends IgniteAbstractTest {
RemotelyTriggeredResourceRegistry resourceRegistry = new
RemotelyTriggeredResourceRegistry();
- TransactionInflights transactionInflights = new
TransactionInflights(placementDriver, clockService);
+ transactionInflights = new TransactionInflights(placementDriver,
clockService);
txManager = new TxManagerImpl(
txConfiguration,
@@ -350,6 +352,7 @@ public class TxManagerTest extends IgniteAbstractTest {
tx.enlist(replicationGroupId, 10, REMOTE_NODE.name(), 1L);
tx.assignCommitPartition(replicationGroupId);
+ transactionInflights.track(tx.id()); // Enable cleanup path.
tx.commit();
@@ -371,6 +374,7 @@ public class TxManagerTest extends IgniteAbstractTest {
tx.enlist(replicationGroupId, 10, REMOTE_NODE.name(), 1L);
tx.assignCommitPartition(replicationGroupId);
+ transactionInflights.track(tx.id()); // Enable cleanup path.
tx.rollback();
@@ -397,6 +401,7 @@ public class TxManagerTest extends IgniteAbstractTest {
ReplicationGroupId replicationGroupId = targetReplicationGroupId(1, 0);
+ transactionInflights.track(tx.id());
tx.enlist(replicationGroupId, 10, REMOTE_NODE.name(), 1L);
tx.assignCommitPartition(replicationGroupId);
@@ -425,6 +430,7 @@ public class TxManagerTest extends IgniteAbstractTest {
ReplicationGroupId replicationGroupId = targetReplicationGroupId(1, 0);
+ transactionInflights.track(tx.id());
tx.enlist(replicationGroupId, 10, REMOTE_NODE.name(), 1L);
tx.assignCommitPartition(replicationGroupId);
@@ -834,6 +840,8 @@ public class TxManagerTest extends IgniteAbstractTest {
tx.enlist(replicationGroupId, 10, REMOTE_NODE.name(), 1L);
tx.assignCommitPartition(replicationGroupId);
+ transactionInflights.track(tx.id());
+
return tx;
}
diff --git
a/modules/transactions/src/test/java/org/apache/ignite/internal/tx/impl/ReadWriteTransactionImplTest.java
b/modules/transactions/src/test/java/org/apache/ignite/internal/tx/impl/ReadWriteTransactionImplTest.java
index 98ccb28dd07..b86ff8bb5a1 100644
---
a/modules/transactions/src/test/java/org/apache/ignite/internal/tx/impl/ReadWriteTransactionImplTest.java
+++
b/modules/transactions/src/test/java/org/apache/ignite/internal/tx/impl/ReadWriteTransactionImplTest.java
@@ -104,8 +104,8 @@ class ReadWriteTransactionImplTest extends
BaseIgniteAbstractTest {
private void startTxAndTryToEnlist(boolean commit) {
HashSet<UUID> finishedTxs = new HashSet<>();
- Mockito.when(txManager.finish(any(), any(), anyBoolean(),
anyBoolean(), any(), any())).thenAnswer(invocation -> {
- finishedTxs.add(invocation.getArgument(5));
+ Mockito.when(txManager.finish(any(), any(), anyBoolean(),
anyBoolean(), anyBoolean(), any(), any())).thenAnswer(invocation -> {
+ finishedTxs.add(invocation.getArgument(6));
return nullCompletedFuture();
});