This is an automated email from the ASF dual-hosted git repository.
ascherbakov pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/ignite-3.git
The following commit(s) were added to refs/heads/main by this push:
new 6ce004dc6c3 IGNITE-27617 Ensure directly mapped client transaction
write intents are cleaned up
6ce004dc6c3 is described below
commit 6ce004dc6c375511ec06c1d0c3b573ca3f1f7f6e
Author: Alexey Scherbakov <[email protected]>
AuthorDate: Wed Jan 21 16:16:49 2026 +0300
IGNITE-27617 Ensure directly mapped client transaction write intents are
cleaned up
---
.../client/proto/ProtocolBitmaskFeature.java | 7 +-
.../ignite/client/handler/ItClientHandlerTest.java | 1 +
.../ignite/client/handler/ClientHandlerModule.java | 3 +-
.../handler/ClientInboundMessageHandler.java | 5 +-
.../tx/ClientTransactionCommitRequest.java | 10 +-
.../tx/ClientTransactionRollbackRequest.java | 14 +-
.../client/ClientTransactionInflights.java | 10 +
.../ignite/internal/client/TcpClientChannel.java | 3 +-
.../internal/client/tx/ClientTransaction.java | 3 +
.../apache/ignite/client/fakes/FakeTxManager.java | 1 +
.../partition/replicator/TxRecoveryEngine.java | 1 +
.../internal/client/ItClientDirectMappingTest.java | 231 ++++++++++++++++-----
.../ignite/internal/table/ItColocationTest.java | 1 +
.../replication/PartitionReplicaListenerTest.java | 2 +-
.../ZonePartitionReplicaListenerTest.java | 6 +-
.../distributed/storage/InternalTableImplTest.java | 4 +-
.../tx/distributed/ItTransactionRecoveryTest.java | 1 +
.../org/apache/ignite/internal/tx/TxManager.java | 2 +
.../internal/tx/impl/ReadWriteTransactionImpl.java | 15 ++
.../ignite/internal/tx/impl/TxManagerImpl.java | 3 +-
.../tx/impl/ReadWriteTransactionImplTest.java | 9 +-
21 files changed, 257 insertions(+), 75 deletions(-)
diff --git
a/modules/client-common/src/main/java/org/apache/ignite/internal/client/proto/ProtocolBitmaskFeature.java
b/modules/client-common/src/main/java/org/apache/ignite/internal/client/proto/ProtocolBitmaskFeature.java
index 8a4caa9a0ef..3e9e2417434 100644
---
a/modules/client-common/src/main/java/org/apache/ignite/internal/client/proto/ProtocolBitmaskFeature.java
+++
b/modules/client-common/src/main/java/org/apache/ignite/internal/client/proto/ProtocolBitmaskFeature.java
@@ -97,7 +97,12 @@ public enum ProtocolBitmaskFeature {
/**
* Compute tasks and jobs accept observable timestamp from the client.
*/
- COMPUTE_OBSERVABLE_TS(14);
+ COMPUTE_OBSERVABLE_TS(14),
+
+ /**
+ * Send remote writes flag for directly mapped transactions.
+ */
+ TX_DIRECT_MAPPING_SEND_REMOTE_WRITES(15);
private static final EnumSet<ProtocolBitmaskFeature>
ALL_FEATURES_AS_ENUM_SET =
EnumSet.allOf(ProtocolBitmaskFeature.class);
diff --git
a/modules/client-handler/src/integrationTest/java/org/apache/ignite/client/handler/ItClientHandlerTest.java
b/modules/client-handler/src/integrationTest/java/org/apache/ignite/client/handler/ItClientHandlerTest.java
index b8ad0f0e118..cc5d65f275b 100644
---
a/modules/client-handler/src/integrationTest/java/org/apache/ignite/client/handler/ItClientHandlerTest.java
+++
b/modules/client-handler/src/integrationTest/java/org/apache/ignite/client/handler/ItClientHandlerTest.java
@@ -560,6 +560,7 @@ public class ItClientHandlerTest extends
BaseIgniteAbstractTest {
expected.set(12);
expected.set(13);
expected.set(14);
+ expected.set(15);
assertEquals(expected, supportedFeatures);
var extensionsLen = unpacker.unpackInt();
diff --git
a/modules/client-handler/src/main/java/org/apache/ignite/client/handler/ClientHandlerModule.java
b/modules/client-handler/src/main/java/org/apache/ignite/client/handler/ClientHandlerModule.java
index 9389c76898a..0bab18395fe 100644
---
a/modules/client-handler/src/main/java/org/apache/ignite/client/handler/ClientHandlerModule.java
+++
b/modules/client-handler/src/main/java/org/apache/ignite/client/handler/ClientHandlerModule.java
@@ -98,7 +98,8 @@ public class ClientHandlerModule implements IgniteComponent,
PlatformComputeTran
ProtocolBitmaskFeature.SQL_DIRECT_TX_MAPPING,
ProtocolBitmaskFeature.TX_CLIENT_GETALL_SUPPORTS_TX_OPTIONS,
ProtocolBitmaskFeature.SQL_MULTISTATEMENT_SUPPORT,
- ProtocolBitmaskFeature.COMPUTE_OBSERVABLE_TS
+ ProtocolBitmaskFeature.COMPUTE_OBSERVABLE_TS,
+ ProtocolBitmaskFeature.TX_DIRECT_MAPPING_SEND_REMOTE_WRITES
));
/** Connection id generator.
diff --git
a/modules/client-handler/src/main/java/org/apache/ignite/client/handler/ClientInboundMessageHandler.java
b/modules/client-handler/src/main/java/org/apache/ignite/client/handler/ClientInboundMessageHandler.java
index a083b696403..8b89bbfb58a 100644
---
a/modules/client-handler/src/main/java/org/apache/ignite/client/handler/ClientInboundMessageHandler.java
+++
b/modules/client-handler/src/main/java/org/apache/ignite/client/handler/ClientInboundMessageHandler.java
@@ -25,6 +25,7 @@ import static
org.apache.ignite.internal.client.proto.ProtocolBitmaskFeature.TX_
import static
org.apache.ignite.internal.client.proto.ProtocolBitmaskFeature.TX_CLIENT_GETALL_SUPPORTS_TX_OPTIONS;
import static
org.apache.ignite.internal.client.proto.ProtocolBitmaskFeature.TX_DELAYED_ACKS;
import static
org.apache.ignite.internal.client.proto.ProtocolBitmaskFeature.TX_DIRECT_MAPPING;
+import static
org.apache.ignite.internal.client.proto.ProtocolBitmaskFeature.TX_DIRECT_MAPPING_SEND_REMOTE_WRITES;
import static
org.apache.ignite.internal.client.proto.ProtocolBitmaskFeature.TX_PIGGYBACK;
import static
org.apache.ignite.internal.hlc.HybridTimestamp.NULL_HYBRID_TIMESTAMP;
import static
org.apache.ignite.internal.util.CompletableFutures.falseCompletedFuture;
@@ -949,11 +950,11 @@ public class ClientInboundMessageHandler
case ClientOp.TX_COMMIT:
return ClientTransactionCommitRequest.process(in, resources,
metrics, clockService, igniteTables,
- clientContext.hasFeature(TX_PIGGYBACK), tsTracker);
+ clientContext.hasFeature(TX_PIGGYBACK),
clientContext.hasFeature(TX_DIRECT_MAPPING_SEND_REMOTE_WRITES), tsTracker);
case ClientOp.TX_ROLLBACK:
return ClientTransactionRollbackRequest.process(in, resources,
metrics, igniteTables,
- clientContext.hasFeature(TX_PIGGYBACK));
+ clientContext.hasFeature(TX_PIGGYBACK),
clientContext.hasFeature(TX_DIRECT_MAPPING_SEND_REMOTE_WRITES));
case ClientOp.COMPUTE_EXECUTE:
return ClientComputeExecuteRequest.process(in, compute,
clusterService, notificationSender(requestId), clientContext);
diff --git
a/modules/client-handler/src/main/java/org/apache/ignite/client/handler/requests/tx/ClientTransactionCommitRequest.java
b/modules/client-handler/src/main/java/org/apache/ignite/client/handler/requests/tx/ClientTransactionCommitRequest.java
index 161281673eb..88896e09cc5 100644
---
a/modules/client-handler/src/main/java/org/apache/ignite/client/handler/requests/tx/ClientTransactionCommitRequest.java
+++
b/modules/client-handler/src/main/java/org/apache/ignite/client/handler/requests/tx/ClientTransactionCommitRequest.java
@@ -36,6 +36,7 @@ import org.apache.ignite.internal.table.InternalTable;
import org.apache.ignite.internal.table.TableViewInternal;
import org.apache.ignite.internal.tx.InternalTransaction;
import org.apache.ignite.internal.tx.PendingTxPartitionEnlistment;
+import org.apache.ignite.internal.tx.impl.ReadWriteTransactionImpl;
import org.apache.ignite.internal.util.ExceptionUtils;
import org.apache.ignite.tx.TransactionException;
@@ -52,6 +53,7 @@ public class ClientTransactionCommitRequest {
* @param clockService Clock service.
* @param igniteTables Tables.
* @param enableDirectMapping Enable direct mapping flag.
+ * @param sendRemoteWritesFlag Send remote writes flag.
* @return Future.
*/
public static CompletableFuture<ResponseWriter> process(
@@ -61,6 +63,7 @@ public class ClientTransactionCommitRequest {
ClockService clockService,
IgniteTablesInternal igniteTables,
boolean enableDirectMapping,
+ boolean sendRemoteWritesFlag,
HybridTimestampTracker tsTracker
) throws IgniteInternalCheckedException {
long resourceId = in.unpackLong();
@@ -83,8 +86,13 @@ public class ClientTransactionCommitRequest {
if (cnt > 0) {
long causality = in.unpackLong();
- // Update causality.
+ // Update causality. Used to assign commit timestamp after all
enlistments.
clockService.updateClock(HybridTimestamp.hybridTimestamp(causality));
+
+ ReadWriteTransactionImpl tx0 = (ReadWriteTransactionImpl) tx;
+
+ // Enforce cleanup.
+ tx0.noRemoteWrites(sendRemoteWritesFlag && in.unpackBoolean());
}
Exception ex = null;
diff --git
a/modules/client-handler/src/main/java/org/apache/ignite/client/handler/requests/tx/ClientTransactionRollbackRequest.java
b/modules/client-handler/src/main/java/org/apache/ignite/client/handler/requests/tx/ClientTransactionRollbackRequest.java
index 1ea53aad501..c6492dfa99a 100644
---
a/modules/client-handler/src/main/java/org/apache/ignite/client/handler/requests/tx/ClientTransactionRollbackRequest.java
+++
b/modules/client-handler/src/main/java/org/apache/ignite/client/handler/requests/tx/ClientTransactionRollbackRequest.java
@@ -28,6 +28,7 @@ import
org.apache.ignite.internal.lang.IgniteInternalCheckedException;
import org.apache.ignite.internal.table.IgniteTablesInternal;
import org.apache.ignite.internal.table.TableViewInternal;
import org.apache.ignite.internal.tx.InternalTransaction;
+import org.apache.ignite.internal.tx.impl.ReadWriteTransactionImpl;
/**
* Client transaction rollback request.
@@ -41,6 +42,7 @@ public class ClientTransactionRollbackRequest {
* @param metrics Metrics.
* @param igniteTables Tables facade.
* @param enableDirectMapping Enable direct mapping.
+ * @param sendRemoteWritesFlag Send remote writes flag.
* @return Future.
*/
public static CompletableFuture<ResponseWriter> process(
@@ -48,7 +50,8 @@ public class ClientTransactionRollbackRequest {
ClientResourceRegistry resources,
ClientHandlerMetricSource metrics,
IgniteTablesInternal igniteTables,
- boolean enableDirectMapping
+ boolean enableDirectMapping,
+ boolean sendRemoteWritesFlag
)
throws IgniteInternalCheckedException {
long resourceId = in.unpackLong();
@@ -70,6 +73,15 @@ public class ClientTransactionRollbackRequest {
merge(table.internalTable(), partId, consistentId, token,
tx, false);
}
}
+
+ if (cnt > 0) {
+ in.unpackLong(); // Unpack causality.
+
+ ReadWriteTransactionImpl tx0 = (ReadWriteTransactionImpl) tx;
+
+ // Enforce cleanup.
+ tx0.noRemoteWrites(sendRemoteWritesFlag && in.unpackBoolean());
+ }
}
return tx.rollbackAsync().handle((res, err) -> {
diff --git
a/modules/client/src/main/java/org/apache/ignite/internal/client/ClientTransactionInflights.java
b/modules/client/src/main/java/org/apache/ignite/internal/client/ClientTransactionInflights.java
index 74b5512cd90..fbfeb1d9611 100644
---
a/modules/client/src/main/java/org/apache/ignite/internal/client/ClientTransactionInflights.java
+++
b/modules/client/src/main/java/org/apache/ignite/internal/client/ClientTransactionInflights.java
@@ -125,6 +125,16 @@ public class ClientTransactionInflights {
txCtxMap.remove(uuid);
}
+ /**
+ * Check if the inflights map contains a given transaction.
+ *
+ * @param txId Tx id.
+ * @return {@code True} if contains.
+ */
+ public boolean contains(UUID txId) {
+ return txCtxMap.containsKey(txId);
+ }
+
/**
* Transaction inflights context.
*/
diff --git
a/modules/client/src/main/java/org/apache/ignite/internal/client/TcpClientChannel.java
b/modules/client/src/main/java/org/apache/ignite/internal/client/TcpClientChannel.java
index b8ce92e43c5..b441fa19a7e 100644
---
a/modules/client/src/main/java/org/apache/ignite/internal/client/TcpClientChannel.java
+++
b/modules/client/src/main/java/org/apache/ignite/internal/client/TcpClientChannel.java
@@ -99,7 +99,8 @@ class TcpClientChannel implements ClientChannel,
ClientMessageHandler, ClientCon
ProtocolBitmaskFeature.SQL_DIRECT_TX_MAPPING,
ProtocolBitmaskFeature.TX_CLIENT_GETALL_SUPPORTS_TX_OPTIONS,
ProtocolBitmaskFeature.SQL_MULTISTATEMENT_SUPPORT,
- ProtocolBitmaskFeature.COMPUTE_OBSERVABLE_TS
+ ProtocolBitmaskFeature.COMPUTE_OBSERVABLE_TS,
+ ProtocolBitmaskFeature.TX_DIRECT_MAPPING_SEND_REMOTE_WRITES
));
/** Minimum supported heartbeat interval. */
diff --git
a/modules/client/src/main/java/org/apache/ignite/internal/client/tx/ClientTransaction.java
b/modules/client/src/main/java/org/apache/ignite/internal/client/tx/ClientTransaction.java
index 74a7d8c3ac4..d1c31601921 100644
---
a/modules/client/src/main/java/org/apache/ignite/internal/client/tx/ClientTransaction.java
+++
b/modules/client/src/main/java/org/apache/ignite/internal/client/tx/ClientTransaction.java
@@ -314,6 +314,9 @@ public class ClientTransaction implements Transaction {
if (cnt > 0) {
w.out().packLong(tracker.get().longValue());
+
+ // Send information about directly mapped writes to ensure a
proper cleanup algorithm is chosen.
+ w.out().packBoolean(!ch.inflights().contains(txId));
}
}
diff --git
a/modules/client/src/test/java/org/apache/ignite/client/fakes/FakeTxManager.java
b/modules/client/src/test/java/org/apache/ignite/client/fakes/FakeTxManager.java
index d12b1de5e91..2f0b4851375 100644
---
a/modules/client/src/test/java/org/apache/ignite/client/fakes/FakeTxManager.java
+++
b/modules/client/src/test/java/org/apache/ignite/client/fakes/FakeTxManager.java
@@ -234,6 +234,7 @@ public class FakeTxManager implements TxManager {
boolean commitIntent,
boolean timeoutExceeded,
boolean recovery,
+ boolean noRemoteWrites,
Map<ZonePartitionId, PendingTxPartitionEnlistment> enlistedGroups,
UUID txId
) {
diff --git
a/modules/partition-replicator/src/main/java/org/apache/ignite/internal/partition/replicator/TxRecoveryEngine.java
b/modules/partition-replicator/src/main/java/org/apache/ignite/internal/partition/replicator/TxRecoveryEngine.java
index b2d452f0794..87706ef4cc1 100644
---
a/modules/partition-replicator/src/main/java/org/apache/ignite/internal/partition/replicator/TxRecoveryEngine.java
+++
b/modules/partition-replicator/src/main/java/org/apache/ignite/internal/partition/replicator/TxRecoveryEngine.java
@@ -70,6 +70,7 @@ public class TxRecoveryEngine {
false,
false,
true,
+ false,
Map.of(replicationGroupId,
abandonedTxRecoveryEnlistmentFactory.apply(clusterNodeResolver.getById(senderId))),
txId
)
diff --git
a/modules/runner/src/integrationTest/java/org/apache/ignite/internal/client/ItClientDirectMappingTest.java
b/modules/runner/src/integrationTest/java/org/apache/ignite/internal/client/ItClientDirectMappingTest.java
index b671b25ca24..ad39ceb7677 100644
---
a/modules/runner/src/integrationTest/java/org/apache/ignite/internal/client/ItClientDirectMappingTest.java
+++
b/modules/runner/src/integrationTest/java/org/apache/ignite/internal/client/ItClientDirectMappingTest.java
@@ -19,26 +19,54 @@ package org.apache.ignite.internal.client;
import static
org.apache.ignite.internal.TestDefaultProfilesNames.DEFAULT_AIPERSIST_PROFILE_NAME;
import static
org.apache.ignite.internal.sql.engine.util.SqlTestUtils.executeUpdate;
-import static org.apache.ignite.internal.tx.test.ItTransactionTestUtils.withTx;
-import static
org.apache.ignite.internal.tx.test.ItTransactionTestUtils.withTxVoid;
-import static org.junit.jupiter.api.Assertions.assertTrue;
-
-import org.apache.ignite.InitParametersBuilder;
+import static org.junit.jupiter.api.Assertions.assertNotNull;
+import static org.junit.jupiter.api.Assertions.assertNull;
+import static org.junit.jupiter.api.Assertions.fail;
+
+import java.util.ArrayList;
+import java.util.BitSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Map.Entry;
+import java.util.concurrent.TimeUnit;
+import java.util.stream.Collectors;
import org.apache.ignite.client.IgniteClient;
-import org.apache.ignite.internal.ClusterPerTestIntegrationTest;
-import org.apache.ignite.internal.client.table.ClientTable;
+import org.apache.ignite.client.IgniteClient.Builder;
+import org.apache.ignite.internal.ClusterPerClassIntegrationTest;
+import org.apache.ignite.internal.client.proto.ProtocolBitmaskFeature;
+import org.apache.ignite.internal.client.tx.ClientLazyTransaction;
+import org.apache.ignite.internal.testframework.IgniteTestUtils;
+import org.apache.ignite.network.ClusterNode;
import org.apache.ignite.table.KeyValueView;
+import org.apache.ignite.table.Table;
import org.apache.ignite.table.Tuple;
-import org.junit.jupiter.api.BeforeEach;
-import org.junit.jupiter.api.Test;
+import org.apache.ignite.table.partition.Partition;
+import org.apache.ignite.table.partition.PartitionDistribution;
+import org.apache.ignite.tx.TransactionOptions;
+import org.junit.jupiter.api.BeforeAll;
+import org.junit.jupiter.params.ParameterizedTest;
+import org.junit.jupiter.params.provider.ValueSource;
/**
* Test direct mapping for client's transactions.
*/
-public class ItClientDirectMappingTest extends ClusterPerTestIntegrationTest {
+public class ItClientDirectMappingTest extends ClusterPerClassIntegrationTest {
/** Table name. */
private static final String TABLE_NAME = "test_table";
+ private static final String ZONE_NAME = "test_zone";
+
+ /** Table name 2. */
+ private static final String TABLE_NAME_2 = "test_table_2";
+
+ private static final String ZONE_NAME2 = "test_zone2";
+
+ protected static final String COLUMN_KEY = "key";
+
+ protected static final String COLUMN_VAL = "val";
+
+ protected static final int PARTITIONS = 10;
+
/** Nodes bootstrap configuration pattern. */
private static final String NODE_BOOTSTRAP_CFG_TEMPLATE = "ignite {\n"
+ " network: {\n"
@@ -53,66 +81,116 @@ public class ItClientDirectMappingTest extends
ClusterPerTestIntegrationTest {
+ " failureHandler.dumpThreadsOnFailure: false\n"
+ "}";
- @BeforeEach
- public void setup() throws Exception {
- String zoneSql = "create zone test_zone with partitions=5, replicas=1,
storage_profiles='" + DEFAULT_AIPERSIST_PROFILE_NAME + "'";
- String sql = "create table " + TABLE_NAME + " (key int primary key,
val varchar(20)) zone TEST_ZONE";
+ @BeforeAll
+ public static void setup() throws Exception {
+ String zoneSql = "create zone " + ZONE_NAME + " with partitions=" +
PARTITIONS + ", replicas=2, storage_profiles='"
+ + DEFAULT_AIPERSIST_PROFILE_NAME + "'";
+ String sql = "create table " + TABLE_NAME + " (key int primary key,
val varchar(20)) zone " + ZONE_NAME;
+ String zoneSql2 = "create zone " + ZONE_NAME2 + " with partitions=" +
PARTITIONS + ", replicas=2, storage_profiles='"
+ + DEFAULT_AIPERSIST_PROFILE_NAME + "'";
+ String sql2 = "create table " + TABLE_NAME_2 + " (key int primary key,
val varchar(20)) zone " + ZONE_NAME2;
- cluster.doInSession(0, session -> {
+ CLUSTER.doInSession(0, session -> {
executeUpdate(zoneSql, session);
executeUpdate(sql, session);
+ executeUpdate(zoneSql2, session);
+ executeUpdate(sql2, session);
});
}
- @Override
- protected void customizeInitParameters(InitParametersBuilder builder) {
- super.customizeInitParameters(builder);
-
- builder.clusterConfiguration("ignite {"
- + " transaction: {"
- + " readOnlyTimeoutMillis: 30000,"
- + " readWriteTimeoutMillis: 30000"
- + " },"
- + " replication: {"
- + " rpcTimeoutMillis: 30000"
- + " },"
- + "}");
- }
-
- @Test
- public void testBasicImplicit() {
- try (IgniteClient client0 = clientConnectedToNode(0)) {
- ClientTable table = (ClientTable)
client0.tables().table(TABLE_NAME);
- KeyValueView<Tuple, Tuple> kvView = table.keyValueView();
-
- Tuple key = Tuple.create().set("key", 0);
- Tuple val = Tuple.create().set("val", "test0");
+ @ParameterizedTest
+ @ValueSource(ints = {0, 1, 2})
+ public void testReadOnCoordinatorWithDirectWrite(int testMode) {
+ TestMode mode = TestMode.values()[testMode];
+ boolean commit = mode == TestMode.COMMIT || mode == TestMode.COMPAT;
+ if (mode == TestMode.COMPAT) {
+ BitSet features = IgniteTestUtils.getFieldValue(null,
TcpClientChannel.class, "SUPPORTED_FEATURES");
+
features.clear(ProtocolBitmaskFeature.TX_DIRECT_MAPPING_SEND_REMOTE_WRITES.featureId());
+ }
- kvView.put(null, key, val);
- Tuple val0 = kvView.get(null, key);
- assertTrue(Tuple.equals(val, val0));
+ try (IgniteClient client = clientConnectedToAllNodes()) {
+ Table table1 = client.tables().table(TABLE_NAME);
+ KeyValueView<Tuple, Tuple> view1 = table1.keyValueView();
+ Table table2 = client.tables().table(TABLE_NAME_2);
+ KeyValueView<Tuple, Tuple> view2 = table2.keyValueView();
+
+ Map<Partition, ClusterNode> map1 =
table1.partitionDistribution().primaryReplicasAsync().join();
+ Map<Integer, ClusterNode> mapPartById1 =
map1.entrySet().stream().collect(Collectors.toMap(
+ entry -> Math.toIntExact(entry.getKey().id()),
+ Entry::getValue
+ ));
+
+ Map<Partition, ClusterNode> map2 =
table2.partitionDistribution().primaryReplicasAsync().join();
+ Map<Integer, ClusterNode> mapPartById2 =
map2.entrySet().stream().collect(Collectors.toMap(
+ entry -> Math.toIntExact(entry.getKey().id()),
+ Entry::getValue
+ ));
+
+ // Find a partition which mapped to different primaries.
+ int targetPart = -1;
+
+ for (int i = 0; i < PARTITIONS; i++) {
+ ClusterNode node1 = mapPartById1.get(i);
+ ClusterNode node2 = mapPartById2.get(i);
+
+ if (!node1.equals(node2)) {
+ targetPart = i;
+ break;
+ }
+ }
+
+ if (targetPart == -1) {
+ log.warn("Skipping test due to bad assignment");
+ return;
+ }
+
+ log.info("DBG: using partition " + targetPart);
+
+ // Tables have the same structure, can reuse keys.
+ List<Tuple> keys = generateKeysForPartition(commit ? 0 : 100, 1,
map1, targetPart, table1);
+
+ ClientLazyTransaction tx = (ClientLazyTransaction)
client.transactions().begin(new TransactionOptions().readOnly(false));
+
+ Tuple key = keys.get(0);
+ // Enlist read operation on coordinator (proxy mode).
+ if (view2.get(tx, key) != null) {
+ fail("Should never happen");
+
+ return;
+ }
+
+ // Enlist write operation on other node (direct mode).
+ view1.put(tx, key, val("test" + key.intValue(0)));
+ if (commit) {
+ tx.commit();
+ } else {
+ tx.rollback();
+ }
+
+ if (commit) {
+ assertNotNull(view1.get(null, key), "key=" + key);
+ } else {
+ assertNull(view1.get(null, key), "key=" + key);
+ }
+ } finally {
+ if (mode == TestMode.COMPAT) {
+ BitSet features = IgniteTestUtils.getFieldValue(null,
TcpClientChannel.class, "SUPPORTED_FEATURES");
+
features.set(ProtocolBitmaskFeature.TX_DIRECT_MAPPING_SEND_REMOTE_WRITES.featureId());
+ }
}
}
- @Test
- public void testBasicExplicit() {
- try (IgniteClient client0 = clientConnectedToNode(0)) {
- ClientTable table = (ClientTable)
client0.tables().table(TABLE_NAME);
- KeyValueView<Tuple, Tuple> kvView = table.keyValueView();
-
- Tuple key = Tuple.create().set("key", 0);
- Tuple val = Tuple.create().set("val", "test0");
+ private IgniteClient clientConnectedToAllNodes() {
+ Builder builder = IgniteClient.builder();
+ String[] addresses = new String[initialNodes()];
- withTxVoid(client0.transactions(), tx -> kvView.put(tx, key, val));
- Tuple val0 = withTx(client0.transactions(), tx -> kvView.get(tx,
key));
- assertTrue(Tuple.equals(val, val0));
+ for (int i = 0; i < initialNodes(); i++) {
+ addresses[i] = "localhost:" + igniteImpl(i).clientAddress().port();
}
- }
- private IgniteClient clientConnectedToNode(int nodeIndex) {
- return IgniteClient.builder()
- .addresses("localhost:" +
igniteImpl(nodeIndex).clientAddress().port())
- .build();
+ builder.addresses(addresses);
+
+ return builder.build();
}
/**
@@ -129,4 +207,43 @@ public class ItClientDirectMappingTest extends
ClusterPerTestIntegrationTest {
protected int initialNodes() {
return 2;
}
+
+ private static List<Tuple> generateKeysForPartition(
+ int start,
+ int count,
+ Map<Partition, ClusterNode> map,
+ int partId,
+ Table table
+ ) {
+ List<Tuple> keys = new ArrayList<>();
+ PartitionDistribution partitionManager = table.partitionDistribution();
+
+ int k = start;
+ while (keys.size() != count) {
+ k++;
+ Tuple t = key(k);
+
+ Partition part = partitionManager.partitionAsync(t).orTimeout(5,
TimeUnit.SECONDS).join();
+
+ if (part.id() == partId) {
+ keys.add(t);
+ }
+ }
+
+ return keys;
+ }
+
+ private static Tuple val(String v) {
+ return Tuple.create().set(COLUMN_VAL, v);
+ }
+
+ private static Tuple key(Integer k) {
+ return Tuple.create().set(COLUMN_KEY, k);
+ }
+
+ enum TestMode {
+ COMMIT,
+ ROLLBACK,
+ COMPAT
+ }
}
diff --git
a/modules/table/src/integrationTest/java/org/apache/ignite/internal/table/ItColocationTest.java
b/modules/table/src/integrationTest/java/org/apache/ignite/internal/table/ItColocationTest.java
index accc9b66b48..fd8bbc1477d 100644
---
a/modules/table/src/integrationTest/java/org/apache/ignite/internal/table/ItColocationTest.java
+++
b/modules/table/src/integrationTest/java/org/apache/ignite/internal/table/ItColocationTest.java
@@ -229,6 +229,7 @@ public class ItColocationTest extends
BaseIgniteAbstractTest {
boolean commitIntent,
boolean timeoutExceeded,
boolean recovery,
+ boolean noRemoteWrites,
Map<ZonePartitionId, PendingTxPartitionEnlistment>
enlistedGroups,
UUID txId
) {
diff --git
a/modules/table/src/test/java/org/apache/ignite/internal/table/distributed/replication/PartitionReplicaListenerTest.java
b/modules/table/src/test/java/org/apache/ignite/internal/table/distributed/replication/PartitionReplicaListenerTest.java
index dee8e3fee0e..8fc68517c4c 100644
---
a/modules/table/src/test/java/org/apache/ignite/internal/table/distributed/replication/PartitionReplicaListenerTest.java
+++
b/modules/table/src/test/java/org/apache/ignite/internal/table/distributed/replication/PartitionReplicaListenerTest.java
@@ -2030,7 +2030,7 @@ public class PartitionReplicaListenerTest extends
IgniteAbstractTest {
doAnswer(invocation ->
nullCompletedFuture()).when(txManager).executeWriteIntentSwitchAsync(any(Runnable.class));
doAnswer(invocation -> nullCompletedFuture())
- .when(txManager).finish(any(), any(), anyBoolean(),
anyBoolean(), anyBoolean(), any(), any());
+ .when(txManager).finish(any(), any(), anyBoolean(),
anyBoolean(), anyBoolean(), anyBoolean(), any(), any());
doAnswer(invocation -> nullCompletedFuture())
.when(txManager).cleanup(any(), anyString(), any());
}
diff --git
a/modules/table/src/test/java/org/apache/ignite/internal/table/distributed/replication/ZonePartitionReplicaListenerTest.java
b/modules/table/src/test/java/org/apache/ignite/internal/table/distributed/replication/ZonePartitionReplicaListenerTest.java
index 0ec3a7bfa22..ecffcacf052 100644
---
a/modules/table/src/test/java/org/apache/ignite/internal/table/distributed/replication/ZonePartitionReplicaListenerTest.java
+++
b/modules/table/src/test/java/org/apache/ignite/internal/table/distributed/replication/ZonePartitionReplicaListenerTest.java
@@ -967,7 +967,7 @@ public class ZonePartitionReplicaListenerTest extends
IgniteAbstractTest {
doAnswer(invocation ->
nullCompletedFuture()).when(txManager).executeWriteIntentSwitchAsync(any(Runnable.class));
doAnswer(invocation -> nullCompletedFuture())
- .when(txManager).finish(any(), any(), anyBoolean(),
anyBoolean(), anyBoolean(), any(), any());
+ .when(txManager).finish(any(), any(), anyBoolean(),
anyBoolean(), anyBoolean(), anyBoolean(), any(), any());
doAnswer(invocation -> nullCompletedFuture())
.when(txManager).cleanup(any(), anyString(), any());
}
@@ -1345,7 +1345,7 @@ public class ZonePartitionReplicaListenerTest extends
IgniteAbstractTest {
@Test
public void testTxStateReplicaRequestEmptyState() throws Exception {
doAnswer(invocation -> {
- UUID txId = invocation.getArgument(6);
+ UUID txId = invocation.getArgument(7);
txManager.updateTxMeta(txId, old -> TxStateMeta.builder(ABORTED)
.txCoordinatorId(localNode.id())
@@ -1354,7 +1354,7 @@ public class ZonePartitionReplicaListenerTest extends
IgniteAbstractTest {
);
return nullCompletedFuture();
- }).when(txManager).finish(any(), any(), anyBoolean(), anyBoolean(),
anyBoolean(), any(), any());
+ }).when(txManager).finish(any(), any(), anyBoolean(), anyBoolean(),
anyBoolean(), anyBoolean(), any(), any());
CompletableFuture<ReplicaResult> fut =
zonePartitionReplicaListener.invoke(TX_MESSAGES_FACTORY.txStateCommitPartitionRequest()
.groupId(zonePartitionIdMessage(grpId))
diff --git
a/modules/table/src/test/java/org/apache/ignite/internal/table/distributed/storage/InternalTableImplTest.java
b/modules/table/src/test/java/org/apache/ignite/internal/table/distributed/storage/InternalTableImplTest.java
index bd55a59e981..f96ab30e4c0 100644
---
a/modules/table/src/test/java/org/apache/ignite/internal/table/distributed/storage/InternalTableImplTest.java
+++
b/modules/table/src/test/java/org/apache/ignite/internal/table/distributed/storage/InternalTableImplTest.java
@@ -164,7 +164,7 @@ public class InternalTableImplTest extends
BaseIgniteAbstractTest {
);
});
- lenient().when(txManager.finish(any(), any(), anyBoolean(),
anyBoolean(), anyBoolean(), any(), any()))
+ lenient().when(txManager.finish(any(), any(), anyBoolean(),
anyBoolean(), anyBoolean(), anyBoolean(), any(), any()))
.thenReturn(nullCompletedFuture());
// Mock for creating implicit transactions when null is passed
@@ -388,7 +388,7 @@ public class InternalTableImplTest extends
BaseIgniteAbstractTest {
private Map<ZonePartitionId, PendingTxPartitionEnlistment>
extractEnlistmentsFromTxFinish() {
ArgumentCaptor<Map<ZonePartitionId, PendingTxPartitionEnlistment>>
enlistmentsCaptor = ArgumentCaptor.captor();
- verify(txManager).finish(any(), any(), anyBoolean(), anyBoolean(),
anyBoolean(), enlistmentsCaptor.capture(), any());
+ verify(txManager).finish(any(), any(), anyBoolean(), anyBoolean(),
anyBoolean(), anyBoolean(), enlistmentsCaptor.capture(), any());
return enlistmentsCaptor.getValue();
}
diff --git
a/modules/transactions/src/integrationTest/java/org/apache/ignite/tx/distributed/ItTransactionRecoveryTest.java
b/modules/transactions/src/integrationTest/java/org/apache/ignite/tx/distributed/ItTransactionRecoveryTest.java
index ab88e3c864b..9736942803d 100644
---
a/modules/transactions/src/integrationTest/java/org/apache/ignite/tx/distributed/ItTransactionRecoveryTest.java
+++
b/modules/transactions/src/integrationTest/java/org/apache/ignite/tx/distributed/ItTransactionRecoveryTest.java
@@ -738,6 +738,7 @@ public class ItTransactionRecoveryTest extends
ClusterPerTestIntegrationTest {
false,
false,
true,
+ false,
Map.of(commitPartition, new
PendingTxPartitionEnlistment(txCrdNode2.node().name(), 0L)),
rwTx1Id
);
diff --git
a/modules/transactions/src/main/java/org/apache/ignite/internal/tx/TxManager.java
b/modules/transactions/src/main/java/org/apache/ignite/internal/tx/TxManager.java
index 57e7525bb5d..bab8f783c8e 100644
---
a/modules/transactions/src/main/java/org/apache/ignite/internal/tx/TxManager.java
+++
b/modules/transactions/src/main/java/org/apache/ignite/internal/tx/TxManager.java
@@ -183,6 +183,7 @@ public interface TxManager extends IgniteComponent {
* @param commitIntent {@code true} if a commit requested.
* @param timeoutExceeded {@code true} if a timeout exceeded.
* @param recovery {@code true} if finished by recovery.
+ * @param noRemoteWrites {@code true} if remote(directly mapped) part of
this transaction has no writes.
* @param enlistedGroups Map of enlisted partitions.
* @param txId Transaction id.
*/
@@ -192,6 +193,7 @@ public interface TxManager extends IgniteComponent {
boolean commitIntent,
boolean timeoutExceeded,
boolean recovery,
+ boolean noRemoteWrites,
Map<ZonePartitionId, PendingTxPartitionEnlistment> enlistedGroups,
UUID txId
);
diff --git
a/modules/transactions/src/main/java/org/apache/ignite/internal/tx/impl/ReadWriteTransactionImpl.java
b/modules/transactions/src/main/java/org/apache/ignite/internal/tx/impl/ReadWriteTransactionImpl.java
index 6e58b3dd4f3..2cefc8e2756 100644
---
a/modules/transactions/src/main/java/org/apache/ignite/internal/tx/impl/ReadWriteTransactionImpl.java
+++
b/modules/transactions/src/main/java/org/apache/ignite/internal/tx/impl/ReadWriteTransactionImpl.java
@@ -64,6 +64,11 @@ public class ReadWriteTransactionImpl extends
IgniteAbstractTransactionImpl {
*/
private boolean killed;
+ /**
+ * {@code True} if a remote(directly mapped) part of this transaction has
no writes.
+ */
+ private boolean noRemoteWrites = true;
+
/**
* Constructs an explicit read-write transaction.
*
@@ -240,6 +245,7 @@ public class ReadWriteTransactionImpl extends
IgniteAbstractTransactionImpl {
commit,
timeoutExceeded,
false,
+ noRemoteWrites,
enlisted,
id()
);
@@ -310,4 +316,13 @@ public class ReadWriteTransactionImpl extends
IgniteAbstractTransactionImpl {
// Thread safety is not needed.
finishFuture = failedFuture(e);
}
+
+ /**
+ * Set no remote writes flag.
+ *
+ * @param noRemoteWrites The value.
+ */
+ public void noRemoteWrites(boolean noRemoteWrites) {
+ this.noRemoteWrites = noRemoteWrites;
+ }
}
diff --git
a/modules/transactions/src/main/java/org/apache/ignite/internal/tx/impl/TxManagerImpl.java
b/modules/transactions/src/main/java/org/apache/ignite/internal/tx/impl/TxManagerImpl.java
index 374b4e11cbf..6f25ad4e6b1 100644
---
a/modules/transactions/src/main/java/org/apache/ignite/internal/tx/impl/TxManagerImpl.java
+++
b/modules/transactions/src/main/java/org/apache/ignite/internal/tx/impl/TxManagerImpl.java
@@ -636,6 +636,7 @@ public class TxManagerImpl implements TxManager,
NetworkMessageHandler, SystemVi
boolean commitIntent,
boolean timeout,
boolean recovery,
+ boolean noRemoteWrites,
Map<ZonePartitionId, PendingTxPartitionEnlistment> enlistedGroups,
UUID txId
) {
@@ -700,7 +701,7 @@ public class TxManagerImpl implements TxManager,
NetworkMessageHandler, SystemVi
enlistedGroups,
txId,
finishingStateMeta.txFinishFuture(),
- txContext.isNoWrites() && !recovery
+ txContext.isNoWrites() && noRemoteWrites && !recovery
)
).whenComplete((unused, throwable) -> {
if (localNodeId.equals(finishingStateMeta.txCoordinatorId())) {
diff --git
a/modules/transactions/src/test/java/org/apache/ignite/internal/tx/impl/ReadWriteTransactionImplTest.java
b/modules/transactions/src/test/java/org/apache/ignite/internal/tx/impl/ReadWriteTransactionImplTest.java
index 349cafd4efd..8a4c1a16864 100644
---
a/modules/transactions/src/test/java/org/apache/ignite/internal/tx/impl/ReadWriteTransactionImplTest.java
+++
b/modules/transactions/src/test/java/org/apache/ignite/internal/tx/impl/ReadWriteTransactionImplTest.java
@@ -94,11 +94,12 @@ class ReadWriteTransactionImplTest extends
BaseIgniteAbstractTest {
private void startTxAndTryToEnlist(boolean commit) {
HashSet<UUID> finishedTxs = new HashSet<>();
- Mockito.when(txManager.finish(any(), any(), anyBoolean(),
anyBoolean(), anyBoolean(), any(), any())).thenAnswer(invocation -> {
- finishedTxs.add(invocation.getArgument(6));
+ Mockito.when(txManager.finish(any(), any(), anyBoolean(),
anyBoolean(), anyBoolean(), anyBoolean(), any(), any()))
+ .thenAnswer(invocation -> {
+ finishedTxs.add(invocation.getArgument(7));
- return nullCompletedFuture();
- });
+ return nullCompletedFuture();
+ });
Mockito.when(txManager.stateMeta(any())).thenAnswer(invocation -> {
if (finishedTxs.contains(invocation.getArgument(0))) {