This is an automated email from the ASF dual-hosted git repository.
ptupitsyn pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/ignite-3.git
The following commit(s) were added to refs/heads/main by this push:
new 3f0806bd54a IGNITE-27947 Fix rollback for client piggyback tx (#7844)
3f0806bd54a is described below
commit 3f0806bd54ad18c4b9c98870702aff79e2722aa2
Author: Tiago Marques Godinho <[email protected]>
AuthorDate: Wed Apr 22 12:21:17 2026 +0100
IGNITE-27947 Fix rollback for client piggyback tx (#7844)
Allow rollback for piggyback transactions: piggyback client tx combines
**transaction start** with **table operation** in one client request. If the
table operation is stuck on a lock, the client never receives the transaction
id. Instead, we use the `requestId` (generated on the client) to rollback the
transaction.
* Server-side:
* `TX_ROLLBACK` accepts a request id of the first request ("piggyback")
of a direct mapped TX. Request Id is encoded in the negative range of
`resourceId`.
* Track `requestId -> txId` to roll back piggyback transactions stuck on
a lock
* Update all the operations. RO ops have the same parameters just for
consistency.
* Client Side:
* Allow multiple `onSent` callbacks on the payload output object.
* Add info to `ClientLazyTransaction` about the first request in the TX,
update via `PayloadOutputChannel` on successful request.
* Implement `TX_ROLLBACK` based on `firstReqId`
---
.../client/proto/ProtocolBitmaskFeature.java | 7 +-
.../ignite/client/handler/ItClientHandlerTest.java | 1 +
.../ignite/client/handler/ClientHandlerModule.java | 3 +-
.../handler/ClientInboundMessageHandler.java | 283 ++++++++++++++++++---
.../requests/sql/ClientSqlExecuteBatchRequest.java | 6 +-
.../requests/sql/ClientSqlExecuteRequest.java | 6 +-
.../sql/ClientSqlQueryMetadataRequest.java | 19 +-
.../handler/requests/table/ClientTableCommon.java | 40 ++-
.../table/ClientTupleContainsAllKeysRequest.java | 7 +-
.../table/ClientTupleContainsKeyRequest.java | 10 +-
.../table/ClientTupleDeleteAllExactRequest.java | 23 +-
.../table/ClientTupleDeleteAllRequest.java | 10 +-
.../table/ClientTupleDeleteExactRequest.java | 31 ++-
.../requests/table/ClientTupleDeleteRequest.java | 10 +-
.../requests/table/ClientTupleGetAllRequest.java | 7 +-
.../table/ClientTupleGetAndDeleteRequest.java | 10 +-
.../table/ClientTupleGetAndReplaceRequest.java | 23 +-
.../table/ClientTupleGetAndUpsertRequest.java | 23 +-
.../requests/table/ClientTupleGetRequest.java | 10 +-
.../table/ClientTupleInsertAllRequest.java | 22 +-
.../requests/table/ClientTupleInsertRequest.java | 23 +-
.../table/ClientTupleReplaceExactRequest.java | 11 +-
.../requests/table/ClientTupleReplaceRequest.java | 23 +-
.../requests/table/ClientTupleRequestBase.java | 9 +-
.../table/ClientTupleUpsertAllRequest.java | 23 +-
.../requests/table/ClientTupleUpsertRequest.java | 24 +-
.../requests/table/ClientTuplesRequestBase.java | 20 +-
.../table/DirectTransactionWithFirstRequest.java | 192 ++++++++++++++
.../tx/ClientTransactionCommitRequest.java | 3 +-
.../tx/ClientTransactionRollbackRequest.java | 59 +++--
.../client/ItThinClientTransactionsTest.java | 175 +++++++++++--
.../internal/client/PayloadOutputChannel.java | 14 +-
.../ignite/internal/client/TcpClientChannel.java | 8 +-
.../ignite/internal/client/sql/ClientSql.java | 31 +--
.../ignite/internal/client/table/ClientTable.java | 43 +---
.../internal/client/tx/ClientLazyTransaction.java | 68 ++++-
.../internal/client/tx/ClientTransaction.java | 2 +-
.../ignite/internal/client/tx/DirectTxUtils.java | 86 +++++++
38 files changed, 1164 insertions(+), 201 deletions(-)
diff --git
a/modules/client-common/src/main/java/org/apache/ignite/internal/client/proto/ProtocolBitmaskFeature.java
b/modules/client-common/src/main/java/org/apache/ignite/internal/client/proto/ProtocolBitmaskFeature.java
index de7ddd28298..16da6fda385 100644
---
a/modules/client-common/src/main/java/org/apache/ignite/internal/client/proto/ProtocolBitmaskFeature.java
+++
b/modules/client-common/src/main/java/org/apache/ignite/internal/client/proto/ProtocolBitmaskFeature.java
@@ -117,7 +117,12 @@ public enum ProtocolBitmaskFeature {
/**
* Client supports SQL_UPDATE_COUNTERS_2 error extension (single binary
value instead of array).
*/
- SQL_UPDATE_COUNTERS_2(18);
+ SQL_UPDATE_COUNTERS_2(18),
+
+ /**
+ * Allow rolling back direct transactions using the first request id.
+ */
+ TX_ROLLBACK_USING_FIRST_REQUEST(19);
private static final EnumSet<ProtocolBitmaskFeature>
ALL_FEATURES_AS_ENUM_SET =
EnumSet.allOf(ProtocolBitmaskFeature.class);
diff --git
a/modules/client-handler/src/integrationTest/java/org/apache/ignite/client/handler/ItClientHandlerTest.java
b/modules/client-handler/src/integrationTest/java/org/apache/ignite/client/handler/ItClientHandlerTest.java
index b87b84c8ef1..ea38273ca24 100644
---
a/modules/client-handler/src/integrationTest/java/org/apache/ignite/client/handler/ItClientHandlerTest.java
+++
b/modules/client-handler/src/integrationTest/java/org/apache/ignite/client/handler/ItClientHandlerTest.java
@@ -565,6 +565,7 @@ public class ItClientHandlerTest extends
BaseIgniteAbstractTest {
expected.set(16);
expected.set(17);
expected.set(18);
+ expected.set(19);
assertEquals(expected, supportedFeatures);
diff --git
a/modules/client-handler/src/main/java/org/apache/ignite/client/handler/ClientHandlerModule.java
b/modules/client-handler/src/main/java/org/apache/ignite/client/handler/ClientHandlerModule.java
index b7e6a357fc6..52259fe9492 100644
---
a/modules/client-handler/src/main/java/org/apache/ignite/client/handler/ClientHandlerModule.java
+++
b/modules/client-handler/src/main/java/org/apache/ignite/client/handler/ClientHandlerModule.java
@@ -103,7 +103,8 @@ public class ClientHandlerModule implements
IgniteComponent, PlatformComputeTran
ProtocolBitmaskFeature.TX_DIRECT_MAPPING_SEND_REMOTE_WRITES,
ProtocolBitmaskFeature.SQL_PARTITION_AWARENESS_TABLE_NAME,
ProtocolBitmaskFeature.TX_DIRECT_MAPPING_SEND_DISCARD,
- ProtocolBitmaskFeature.SQL_UPDATE_COUNTERS_2
+ ProtocolBitmaskFeature.SQL_UPDATE_COUNTERS_2,
+ ProtocolBitmaskFeature.TX_ROLLBACK_USING_FIRST_REQUEST
));
/** Connection id generator.
diff --git
a/modules/client-handler/src/main/java/org/apache/ignite/client/handler/ClientInboundMessageHandler.java
b/modules/client-handler/src/main/java/org/apache/ignite/client/handler/ClientInboundMessageHandler.java
index f7581fb228b..baa7e788611 100644
---
a/modules/client-handler/src/main/java/org/apache/ignite/client/handler/ClientInboundMessageHandler.java
+++
b/modules/client-handler/src/main/java/org/apache/ignite/client/handler/ClientInboundMessageHandler.java
@@ -29,6 +29,7 @@ import static
org.apache.ignite.internal.client.proto.ProtocolBitmaskFeature.TX_
import static
org.apache.ignite.internal.client.proto.ProtocolBitmaskFeature.TX_DIRECT_MAPPING;
import static
org.apache.ignite.internal.client.proto.ProtocolBitmaskFeature.TX_DIRECT_MAPPING_SEND_REMOTE_WRITES;
import static
org.apache.ignite.internal.client.proto.ProtocolBitmaskFeature.TX_PIGGYBACK;
+import static
org.apache.ignite.internal.client.proto.ProtocolBitmaskFeature.TX_ROLLBACK_USING_FIRST_REQUEST;
import static
org.apache.ignite.internal.hlc.HybridTimestamp.NULL_HYBRID_TIMESTAMP;
import static
org.apache.ignite.internal.util.CompletableFutures.falseCompletedFuture;
import static
org.apache.ignite.internal.util.CompletableFutures.nullCompletedFuture;
@@ -96,6 +97,7 @@ import
org.apache.ignite.client.handler.requests.sql.ClientSqlQueryMetadataReque
import org.apache.ignite.client.handler.requests.table.ClientSchemasGetRequest;
import
org.apache.ignite.client.handler.requests.table.ClientStreamerBatchSendRequest;
import
org.apache.ignite.client.handler.requests.table.ClientStreamerWithReceiverBatchSendRequest;
+import org.apache.ignite.client.handler.requests.table.ClientTableCommon;
import
org.apache.ignite.client.handler.requests.table.ClientTableGetQualifiedRequest;
import org.apache.ignite.client.handler.requests.table.ClientTableGetRequest;
import
org.apache.ignite.client.handler.requests.table.ClientTablePartitionPrimaryReplicasGetRequest;
@@ -190,6 +192,7 @@ import org.apache.ignite.security.AuthenticationType;
import org.apache.ignite.security.exception.InvalidCredentialsException;
import
org.apache.ignite.security.exception.UnsupportedAuthenticationTypeException;
import org.apache.ignite.sql.SqlBatchException;
+import org.apache.ignite.table.IgniteTables;
import org.apache.ignite.tx.RetriableTransactionException;
import org.jetbrains.annotations.Nullable;
import org.jetbrains.annotations.TestOnly;
@@ -292,6 +295,24 @@ public class ClientInboundMessageHandler
private final HandshakeEventLoopSwitcher handshakeEventLoopSwitcher;
+ /**
+ * Tracks mappings between the first {@code requestId} and the {@code
resourceId}
+ * holding the Tx object for directly mapped transactions. The process is
not localized.
+ *
+ * <p><b>Mappings are created:</b>
+ * <ul>
+ * <li>When a direct transaction is created in {@link
ClientTableCommon#readTx(ClientMessageUnpacker, HybridTimestampTracker,
+ * ClientResourceRegistry, TxManager, IgniteTables,
NotificationSender, long[], long, Map)}.
+ * </li>
+ * </ul>
+ *
+ * <p><b>Mappings are removed:</b>
+ * <ul>
+ * <li>During a commit or rollback request. Hook is added at
creation.</li>
+ * </ul>
+ */
+ private final Map<Long, Long> firstReqToTxResMap = new
ConcurrentHashMap<>();
+
/**
* Constructor.
*
@@ -557,6 +578,7 @@ public class ClientInboundMessageHandler
actualFeatures.clear(TX_DELAYED_ACKS.featureId());
actualFeatures.clear(TX_PIGGYBACK.featureId());
actualFeatures.clear(TX_ALLOW_NOOP_ENLIST.featureId());
+ actualFeatures.clear(TX_ROLLBACK_USING_FIRST_REQUEST.featureId());
actualFeatures.clear(SQL_DIRECT_TX_MAPPING.featureId());
} else {
@@ -946,69 +968,239 @@ public class ClientInboundMessageHandler
case ClientOp.TUPLE_UPSERT:
return ClientTupleUpsertRequest.process(
- in, igniteTables, resources, metrics, txManager,
clockService, notificationSender(requestId), tsTracker);
+ in,
+ igniteTables,
+ resources,
+ metrics,
+ txManager,
+ clockService,
+ notificationSender(requestId),
+ tsTracker,
+ requestId,
+ firstReqToTxResMap
+ );
case ClientOp.TUPLE_GET:
- return ClientTupleGetRequest.process(in, igniteTables,
resources, metrics, txManager, clockService, tsTracker);
+ return ClientTupleGetRequest.process(
+ in,
+ igniteTables,
+ resources,
+ metrics,
+ txManager,
+ clockService,
+ tsTracker,
+ requestId,
+ firstReqToTxResMap
+ );
case ClientOp.TUPLE_UPSERT_ALL:
- return ClientTupleUpsertAllRequest.process(in, igniteTables,
resources, metrics, txManager, clockService,
- notificationSender(requestId), tsTracker);
+ return ClientTupleUpsertAllRequest.process(
+ in,
+ igniteTables,
+ resources,
+ metrics,
+ txManager,
+ clockService,
+ notificationSender(requestId),
+ tsTracker,
+ requestId,
+ firstReqToTxResMap
+ );
case ClientOp.TUPLE_GET_ALL:
- return ClientTupleGetAllRequest.process(in, igniteTables,
resources, metrics, txManager, clockService, tsTracker,
-
clientContext.hasFeature(TX_CLIENT_GETALL_SUPPORTS_TX_OPTIONS));
+ return ClientTupleGetAllRequest.process(
+ in,
+ igniteTables,
+ resources,
+ metrics,
+ txManager,
+ clockService,
+ tsTracker,
+ requestId,
+ firstReqToTxResMap,
+
clientContext.hasFeature(TX_CLIENT_GETALL_SUPPORTS_TX_OPTIONS)
+ );
case ClientOp.TUPLE_GET_AND_UPSERT:
- return ClientTupleGetAndUpsertRequest.process(in,
igniteTables, resources, metrics, txManager, clockService,
- notificationSender(requestId), tsTracker);
+ return ClientTupleGetAndUpsertRequest.process(
+ in,
+ igniteTables,
+ resources,
+ metrics,
+ txManager,
+ clockService,
+ notificationSender(requestId),
+ tsTracker,
+ requestId,
+ firstReqToTxResMap
+ );
case ClientOp.TUPLE_INSERT:
- return ClientTupleInsertRequest.process(in, igniteTables,
resources, metrics, txManager, clockService,
- notificationSender(requestId), tsTracker);
+ return ClientTupleInsertRequest.process(
+ in,
+ igniteTables,
+ resources,
+ metrics,
+ txManager,
+ clockService,
+ notificationSender(requestId),
+ tsTracker,
+ requestId,
+ firstReqToTxResMap
+ );
case ClientOp.TUPLE_INSERT_ALL:
- return ClientTupleInsertAllRequest.process(in, igniteTables,
resources, metrics, txManager, clockService,
- notificationSender(requestId), tsTracker);
+ return ClientTupleInsertAllRequest.process(
+ in,
+ igniteTables,
+ resources,
+ metrics,
+ txManager,
+ clockService,
+ notificationSender(requestId),
+ tsTracker,
+ requestId,
+ firstReqToTxResMap
+ );
case ClientOp.TUPLE_REPLACE:
- return ClientTupleReplaceRequest.process(in, igniteTables,
resources, metrics, txManager, clockService,
- notificationSender(requestId), tsTracker);
+ return ClientTupleReplaceRequest.process(
+ in,
+ igniteTables,
+ resources,
+ metrics,
+ txManager,
+ clockService,
+ notificationSender(requestId),
+ tsTracker,
+ requestId,
+ firstReqToTxResMap
+ );
case ClientOp.TUPLE_REPLACE_EXACT:
- return ClientTupleReplaceExactRequest.process(in,
igniteTables, resources, metrics, txManager, clockService,
- notificationSender(requestId), tsTracker);
+ return ClientTupleReplaceExactRequest.process(
+ in,
+ igniteTables,
+ resources,
+ metrics,
+ txManager,
+ clockService,
+ notificationSender(requestId),
+ tsTracker,
+ requestId,
+ firstReqToTxResMap
+ );
case ClientOp.TUPLE_GET_AND_REPLACE:
- return ClientTupleGetAndReplaceRequest.process(in,
igniteTables, resources, metrics, txManager, clockService,
- notificationSender(requestId), tsTracker);
+ return ClientTupleGetAndReplaceRequest.process(
+ in,
+ igniteTables,
+ resources,
+ metrics,
+ txManager,
+ clockService,
+ notificationSender(requestId),
+ tsTracker,
+ requestId,
+ firstReqToTxResMap
+ );
case ClientOp.TUPLE_DELETE:
- return ClientTupleDeleteRequest.process(in, igniteTables,
resources, metrics, txManager, clockService,
- notificationSender(requestId), tsTracker);
+ return ClientTupleDeleteRequest.process(
+ in,
+ igniteTables,
+ resources,
+ metrics,
+ txManager,
+ clockService,
+ notificationSender(requestId),
+ tsTracker,
+ requestId,
+ firstReqToTxResMap
+ );
case ClientOp.TUPLE_DELETE_ALL:
- return ClientTupleDeleteAllRequest.process(in, igniteTables,
resources, metrics, txManager, clockService,
- notificationSender(requestId), tsTracker);
+ return ClientTupleDeleteAllRequest.process(
+ in,
+ igniteTables,
+ resources,
+ metrics,
+ txManager,
+ clockService,
+ notificationSender(requestId),
+ tsTracker,
+ requestId,
+ firstReqToTxResMap
+ );
case ClientOp.TUPLE_DELETE_EXACT:
- return ClientTupleDeleteExactRequest.process(in, igniteTables,
resources, metrics, txManager, clockService,
- notificationSender(requestId), tsTracker);
+ return ClientTupleDeleteExactRequest.process(
+ in,
+ igniteTables,
+ resources,
+ metrics,
+ txManager,
+ clockService,
+ notificationSender(requestId),
+ tsTracker,
+ requestId,
+ firstReqToTxResMap
+ );
case ClientOp.TUPLE_DELETE_ALL_EXACT:
- return ClientTupleDeleteAllExactRequest.process(in,
igniteTables, resources, metrics, txManager, clockService,
- notificationSender(requestId), tsTracker);
+ return ClientTupleDeleteAllExactRequest.process(
+ in,
+ igniteTables,
+ resources,
+ metrics,
+ txManager,
+ clockService,
+ notificationSender(requestId),
+ tsTracker,
+ requestId,
+ firstReqToTxResMap
+ );
case ClientOp.TUPLE_GET_AND_DELETE:
- return ClientTupleGetAndDeleteRequest.process(in,
igniteTables, resources, metrics, txManager, clockService,
- notificationSender(requestId), tsTracker);
+ return ClientTupleGetAndDeleteRequest.process(
+ in,
+ igniteTables,
+ resources,
+ metrics,
+ txManager,
+ clockService,
+ notificationSender(requestId),
+ tsTracker,
+ requestId,
+ firstReqToTxResMap
+ );
case ClientOp.TUPLE_CONTAINS_KEY:
- return ClientTupleContainsKeyRequest.process(in, igniteTables,
resources, metrics, txManager, clockService, tsTracker);
+ return ClientTupleContainsKeyRequest.process(
+ in,
+ igniteTables,
+ resources,
+ metrics,
+ txManager,
+ clockService,
+ tsTracker,
+ requestId,
+ firstReqToTxResMap
+ );
case ClientOp.TUPLE_CONTAINS_ALL_KEYS:
- return ClientTupleContainsAllKeysRequest.process(in,
igniteTables, resources, metrics, txManager, clockService, tsTracker,
-
clientContext.hasFeature(TX_CLIENT_GETALL_SUPPORTS_TX_OPTIONS));
+ return ClientTupleContainsAllKeysRequest.process(
+ in,
+ igniteTables,
+ resources,
+ metrics,
+ txManager,
+ clockService,
+ tsTracker,
+ requestId,
+ firstReqToTxResMap,
+
clientContext.hasFeature(TX_CLIENT_GETALL_SUPPORTS_TX_OPTIONS)
+ );
case ClientOp.JDBC_CONNECT:
return ClientJdbcConnectRequest.execute(in,
jdbcQueryEventHandler, resolveCurrentUsername());
@@ -1057,8 +1249,14 @@ public class ClientInboundMessageHandler
clientContext.hasFeature(TX_PIGGYBACK),
clientContext.hasFeature(TX_DIRECT_MAPPING_SEND_REMOTE_WRITES), tsTracker);
case ClientOp.TX_ROLLBACK:
- return ClientTransactionRollbackRequest.process(in, resources,
metrics, igniteTables,
- clientContext.hasFeature(TX_PIGGYBACK),
clientContext.hasFeature(TX_DIRECT_MAPPING_SEND_REMOTE_WRITES));
+ return ClientTransactionRollbackRequest.process(in,
+ resources,
+ metrics,
+ igniteTables,
+ firstReqToTxResMap,
+ clientContext.hasFeature(TX_PIGGYBACK),
+
clientContext.hasFeature(TX_DIRECT_MAPPING_SEND_REMOTE_WRITES)
+ );
case ClientOp.COMPUTE_EXECUTE:
return ClientComputeExecuteRequest.process(in, compute,
clusterService, notificationSender(requestId), clientContext);
@@ -1112,6 +1310,7 @@ public class ClientInboundMessageHandler
igniteTables,
clockService,
notificationSender(requestId),
+ firstReqToTxResMap,
resolveCurrentUsername(),
clientContext.hasFeature(SQL_MULTISTATEMENT_SUPPORT),
clientContext.hasFeature(SQL_PARTITION_AWARENESS_TABLE_NAME),
@@ -1143,12 +1342,26 @@ public class ClientInboundMessageHandler
case ClientOp.SQL_QUERY_META:
return ClientSqlQueryMetadataRequest.process(
- partitionOperationsExecutor, in, queryProcessor,
resources, metrics, tsTracker
+ partitionOperationsExecutor,
+ in,
+ queryProcessor,
+ resources,
+ metrics,
+ tsTracker,
+ requestId,
+ firstReqToTxResMap
);
case ClientOp.SQL_EXEC_BATCH:
return ClientSqlExecuteBatchRequest.process(
- in, queryProcessor, resources, metrics, requestId,
cancelHandles, tsTracker,
+ in,
+ queryProcessor,
+ resources,
+ metrics,
+ requestId,
+ cancelHandles,
+ tsTracker,
+ firstReqToTxResMap,
resolveCurrentUsername()
);
diff --git
a/modules/client-handler/src/main/java/org/apache/ignite/client/handler/requests/sql/ClientSqlExecuteBatchRequest.java
b/modules/client-handler/src/main/java/org/apache/ignite/client/handler/requests/sql/ClientSqlExecuteBatchRequest.java
index 4f0c9270235..5ab769f45ab 100644
---
a/modules/client-handler/src/main/java/org/apache/ignite/client/handler/requests/sql/ClientSqlExecuteBatchRequest.java
+++
b/modules/client-handler/src/main/java/org/apache/ignite/client/handler/requests/sql/ClientSqlExecuteBatchRequest.java
@@ -48,6 +48,7 @@ public class ClientSqlExecuteBatchRequest {
* @param cancelHandleMap Registry of handlers. Request must register
itself in this registry before switching to another
* thread.
* @param username Authenticated user name.
+ * @param reqToTxMap Tracker for first request of direct transactions.
* @return Future representing result of operation.
*/
public static CompletableFuture<ResponseWriter> process(
@@ -58,6 +59,7 @@ public class ClientSqlExecuteBatchRequest {
long requestId,
Map<Long, CancelHandle> cancelHandleMap,
HybridTimestampTracker tsTracker,
+ Map<Long, Long> reqToTxMap,
String username
) {
CancelHandle cancelHandle = CancelHandle.create();
@@ -71,7 +73,9 @@ public class ClientSqlExecuteBatchRequest {
null,
null,
null,
- null
+ null,
+ requestId,
+ reqToTxMap
);
ClientSqlProperties props = new ClientSqlProperties(in, false);
diff --git
a/modules/client-handler/src/main/java/org/apache/ignite/client/handler/requests/sql/ClientSqlExecuteRequest.java
b/modules/client-handler/src/main/java/org/apache/ignite/client/handler/requests/sql/ClientSqlExecuteRequest.java
index ec39a525a68..98760fb079f 100644
---
a/modules/client-handler/src/main/java/org/apache/ignite/client/handler/requests/sql/ClientSqlExecuteRequest.java
+++
b/modules/client-handler/src/main/java/org/apache/ignite/client/handler/requests/sql/ClientSqlExecuteRequest.java
@@ -79,6 +79,7 @@ public class ClientSqlExecuteRequest {
* transaction.
* @param notificationSender Notification sender is required to send
acknowledge for underlying write operation within a remote
* transaction.
+ * @param reqToTxMap Tracker for first request of direct transactions.
* @param username Authenticated user name or {@code null} for unknown
user.
* @return Future representing result of operation.
*/
@@ -97,6 +98,7 @@ public class ClientSqlExecuteRequest {
IgniteTables tables,
ClockService clockService,
NotificationSender notificationSender,
+ Map<Long, Long> reqToTxMap,
@Nullable String username,
boolean sqlMultistatementSupported,
boolean sqlPartitionAwarenessQualifiedNameSupported,
@@ -119,7 +121,9 @@ public class ClientSqlExecuteRequest {
txManager,
tables,
notificationSender,
- resIdHolder
+ resIdHolder,
+ requestId,
+ reqToTxMap
);
ClientSqlProperties props = new ClientSqlProperties(in,
sqlMultistatementSupported);
diff --git
a/modules/client-handler/src/main/java/org/apache/ignite/client/handler/requests/sql/ClientSqlQueryMetadataRequest.java
b/modules/client-handler/src/main/java/org/apache/ignite/client/handler/requests/sql/ClientSqlQueryMetadataRequest.java
index 06c63951060..46c84d300aa 100644
---
a/modules/client-handler/src/main/java/org/apache/ignite/client/handler/requests/sql/ClientSqlQueryMetadataRequest.java
+++
b/modules/client-handler/src/main/java/org/apache/ignite/client/handler/requests/sql/ClientSqlQueryMetadataRequest.java
@@ -19,6 +19,7 @@ package org.apache.ignite.client.handler.requests.sql;
import static
org.apache.ignite.client.handler.requests.table.ClientTableCommon.readTx;
+import java.util.Map;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.Executor;
import org.apache.ignite.client.handler.ClientHandlerMetricSource;
@@ -43,6 +44,8 @@ public class ClientSqlQueryMetadataRequest {
* @param in Unpacker.
* @param processor SQL API.
* @param resources Resources.
+ * @param requestId Id of the request.
+ * @param reqToTxMap Tracker for first request of direct transactions.
* @return Future representing result of operation.
*/
public static CompletableFuture<ResponseWriter> process(
@@ -51,9 +54,21 @@ public class ClientSqlQueryMetadataRequest {
QueryProcessor processor,
ClientResourceRegistry resources,
ClientHandlerMetricSource metrics,
- HybridTimestampTracker tsTracker
+ HybridTimestampTracker tsTracker,
+ long requestId,
+ Map<Long, Long> reqToTxMap
) {
- CompletableFuture<InternalTransaction> txFut = readTx(in, tsTracker,
resources, metrics, null, null, null, null);
+ CompletableFuture<InternalTransaction> txFut = readTx(in,
+ tsTracker,
+ resources,
+ metrics,
+ null,
+ null,
+ null,
+ null,
+ requestId,
+ reqToTxMap
+ );
String schema = in.unpackString();
String query = in.unpackString();
diff --git
a/modules/client-handler/src/main/java/org/apache/ignite/client/handler/requests/table/ClientTableCommon.java
b/modules/client-handler/src/main/java/org/apache/ignite/client/handler/requests/table/ClientTableCommon.java
index 2fbf674f21a..c954cac1feb 100644
---
a/modules/client-handler/src/main/java/org/apache/ignite/client/handler/requests/table/ClientTableCommon.java
+++
b/modules/client-handler/src/main/java/org/apache/ignite/client/handler/requests/table/ClientTableCommon.java
@@ -30,6 +30,7 @@ import static
org.apache.ignite.lang.ErrorGroups.Transactions.TX_ALREADY_FINISHE
import java.util.BitSet;
import java.util.Collection;
import java.util.EnumSet;
+import java.util.Map;
import java.util.UUID;
import java.util.concurrent.CompletableFuture;
import org.apache.ignite.client.handler.ClientHandlerMetricSource;
@@ -422,6 +423,8 @@ public class ClientTableCommon {
* @param txManager Tx manager.
* @param notificationSender Notification sender.
* @param resourceIdHolder Resource id holder.
+ * @param requestId Id of the request.
+ * @param reqToTxMap Tracker for first request of direct transactions.
* @return Transaction, if present, or null.
*/
public static CompletableFuture<@Nullable InternalTransaction> readTx(
@@ -432,7 +435,9 @@ public class ClientTableCommon {
@Nullable TxManager txManager,
@Nullable IgniteTables tables,
@Nullable NotificationSender notificationSender,
- long[] resourceIdHolder
+ long[] resourceIdHolder,
+ long requestId,
+ Map<Long, Long> reqToTxMap
) {
return readTx(
in,
@@ -443,6 +448,8 @@ public class ClientTableCommon {
tables,
notificationSender,
resourceIdHolder,
+ requestId,
+ reqToTxMap,
EnumSet.noneOf(RequestOptions.class)
);
}
@@ -456,6 +463,8 @@ public class ClientTableCommon {
* @param txManager Tx manager.
* @param notificationSender Notification sender.
* @param resourceIdHolder Resource id holder.
+ * @param requestId Id of the request.
+ * @param reqToTxMap Tracker for first request of direct transactions.
* @param options Request options. Defines how a request is processed.
* @return Transaction, if present, or null.
*/
@@ -468,6 +477,8 @@ public class ClientTableCommon {
@Nullable IgniteTables tables,
@Nullable NotificationSender notificationSender,
long[] resourceIdHolder,
+ long requestId,
+ Map<Long, Long> reqToTxMap,
EnumSet<RequestOptions> options
) {
if (in.tryUnpackNil()) {
@@ -509,11 +520,18 @@ public class ClientTableCommon {
});
InternalTxOptions txOptions = builder.build();
- var tx = startExplicitTx(tsUpdater, txManager,
HybridTimestamp.nullableHybridTimestamp(observableTs), readOnly, txOptions);
+ var tx = new DirectTransactionWithFirstRequest(
+ startExplicitTx(tsUpdater, txManager,
HybridTimestamp.nullableHybridTimestamp(observableTs), readOnly, txOptions),
+ reqToTxMap,
+ requestId
+ );
// Attach resource id only on first direct request.
resourceIdHolder[0] = resources.put(new ClientResource(tx,
tx::rollbackAsync));
+ // Record the mapping between first request and resourceId.
+ reqToTxMap.put(requestId, resourceIdHolder[0]);
+
metrics.transactionsActiveIncrement();
return completedFuture(tx);
@@ -596,9 +614,23 @@ public class ClientTableCommon {
IgniteTables tables,
EnumSet<RequestOptions> options,
@Nullable NotificationSender notificationSender,
- long[] resourceIdHolder
+ long[] resourceIdHolder,
+ long requestId,
+ Map<Long, Long> reqToTxMap
) {
- return readTx(in, readTs, resources, metrics, txManager, tables,
notificationSender, resourceIdHolder, options)
+ return readTx(
+ in,
+ readTs,
+ resources,
+ metrics,
+ txManager,
+ tables,
+ notificationSender,
+ resourceIdHolder,
+ requestId,
+ reqToTxMap,
+ options
+ )
.thenApply(tx -> {
if (tx == null) {
// Implicit transactions do not use an observation
timestamp because RW never depends on it,
diff --git
a/modules/client-handler/src/main/java/org/apache/ignite/client/handler/requests/table/ClientTupleContainsAllKeysRequest.java
b/modules/client-handler/src/main/java/org/apache/ignite/client/handler/requests/table/ClientTupleContainsAllKeysRequest.java
index eb006e5682c..4438c0cf5a0 100644
---
a/modules/client-handler/src/main/java/org/apache/ignite/client/handler/requests/table/ClientTupleContainsAllKeysRequest.java
+++
b/modules/client-handler/src/main/java/org/apache/ignite/client/handler/requests/table/ClientTupleContainsAllKeysRequest.java
@@ -23,6 +23,7 @@ import static
org.apache.ignite.client.handler.requests.table.ClientTupleRequest
import static
org.apache.ignite.client.handler.requests.table.ClientTupleRequestBase.RequestOptions.KEY_ONLY;
import java.util.EnumSet;
+import java.util.Map;
import java.util.concurrent.CompletableFuture;
import org.apache.ignite.client.handler.ClientHandlerMetricSource;
import org.apache.ignite.client.handler.ClientResourceRegistry;
@@ -47,6 +48,8 @@ public class ClientTupleContainsAllKeysRequest {
* @param txManager Transaction manager.
* @param clockService Clock service.
* @param tsTracker Tracker.
+ * @param requestId Id of the request.
+ * @param reqToTxMap Tracker for first request of direct transactions.
* @param supportsOptions {@code True} if supports tx options.
* @return Future.
*/
@@ -58,11 +61,13 @@ public class ClientTupleContainsAllKeysRequest {
TxManager txManager,
ClockService clockService,
HybridTimestampTracker tsTracker,
+ long requestId,
+ Map<Long, Long> reqToTxMap,
boolean supportsOptions
) {
EnumSet<RequestOptions> options = supportsOptions ? of(KEY_ONLY,
HAS_OPTIONS) : of(KEY_ONLY);
- return ClientTuplesRequestBase.readAsync(in, tables, resources,
metrics, txManager, null, tsTracker, options)
+ return ClientTuplesRequestBase.readAsync(in, tables, resources,
metrics, txManager, null, tsTracker, options, requestId, reqToTxMap)
.thenCompose(req ->
req.table().recordView().containsAllAsync(req.tx(), req.tuples())
.thenApply(containsAll -> out -> {
writeTxMeta(out, tsTracker, clockService, req);
diff --git
a/modules/client-handler/src/main/java/org/apache/ignite/client/handler/requests/table/ClientTupleContainsKeyRequest.java
b/modules/client-handler/src/main/java/org/apache/ignite/client/handler/requests/table/ClientTupleContainsKeyRequest.java
index 8a5ffd08f65..1c41263cf26 100644
---
a/modules/client-handler/src/main/java/org/apache/ignite/client/handler/requests/table/ClientTupleContainsKeyRequest.java
+++
b/modules/client-handler/src/main/java/org/apache/ignite/client/handler/requests/table/ClientTupleContainsKeyRequest.java
@@ -21,7 +21,9 @@ import static java.util.EnumSet.of;
import static
org.apache.ignite.client.handler.requests.table.ClientTableCommon.writeTxMeta;
import static
org.apache.ignite.client.handler.requests.table.ClientTupleRequestBase.RequestOptions.KEY_ONLY;
import static
org.apache.ignite.client.handler.requests.table.ClientTupleRequestBase.RequestOptions.READ_ONLY;
+import static
org.apache.ignite.client.handler.requests.table.ClientTupleRequestBase.readAsync;
+import java.util.Map;
import java.util.concurrent.CompletableFuture;
import org.apache.ignite.client.handler.ClientHandlerMetricSource;
import org.apache.ignite.client.handler.ClientResourceRegistry;
@@ -43,6 +45,8 @@ public class ClientTupleContainsKeyRequest {
* @param tables Ignite tables.
* @param resources Resource registry.
* @param txManager Transaction manager.
+ * @param requestId Id of the request.
+ * @param reqToTxMap Tracker for first request of direct transactions.
* @return Future.
*/
public static CompletableFuture<ResponseWriter> process(
@@ -52,9 +56,11 @@ public class ClientTupleContainsKeyRequest {
ClientHandlerMetricSource metrics,
TxManager txManager,
ClockService clockService,
- HybridTimestampTracker tsTracker
+ HybridTimestampTracker tsTracker,
+ long requestId,
+ Map<Long, Long> reqToTxMap
) {
- return ClientTupleRequestBase.readAsync(in, tables, resources,
metrics, txManager, null, tsTracker, of(READ_ONLY, KEY_ONLY))
+ return readAsync(in, tables, resources, metrics, txManager, null,
tsTracker, of(READ_ONLY, KEY_ONLY), requestId, reqToTxMap)
.thenCompose(req ->
req.table().recordView().containsAsync(req.tx(), req.tuple())
.thenApply(res -> out -> {
writeTxMeta(out, tsTracker, clockService, req);
diff --git
a/modules/client-handler/src/main/java/org/apache/ignite/client/handler/requests/table/ClientTupleDeleteAllExactRequest.java
b/modules/client-handler/src/main/java/org/apache/ignite/client/handler/requests/table/ClientTupleDeleteAllExactRequest.java
index e53caedd770..b8183426664 100644
---
a/modules/client-handler/src/main/java/org/apache/ignite/client/handler/requests/table/ClientTupleDeleteAllExactRequest.java
+++
b/modules/client-handler/src/main/java/org/apache/ignite/client/handler/requests/table/ClientTupleDeleteAllExactRequest.java
@@ -22,6 +22,7 @@ import static
org.apache.ignite.client.handler.requests.table.ClientTableCommon.
import static
org.apache.ignite.client.handler.requests.table.ClientTableCommon.writeTxMeta;
import static
org.apache.ignite.client.handler.requests.table.ClientTuplesRequestBase.readAsync;
+import java.util.Map;
import java.util.concurrent.CompletableFuture;
import org.apache.ignite.client.handler.ClientHandlerMetricSource;
import org.apache.ignite.client.handler.ClientResourceRegistry;
@@ -45,6 +46,8 @@ public class ClientTupleDeleteAllExactRequest {
* @param tables Ignite tables.
* @param resources Resource registry.
* @param txManager Ignite transactions.
+ * @param requestId Id of the request.
+ * @param reqToTxMap Tracker for first request of direct transactions.
* @return Future.
*/
public static CompletableFuture<ResponseWriter> process(
@@ -55,13 +58,27 @@ public class ClientTupleDeleteAllExactRequest {
TxManager txManager,
ClockService clockService,
NotificationSender notificationSender,
- HybridTimestampTracker tsTracker
+ HybridTimestampTracker tsTracker,
+ long requestId,
+ Map<Long, Long> reqToTxMap
) {
- return readAsync(in, tables, resources, metrics, txManager,
notificationSender, tsTracker, noneOf(RequestOptions.class))
+ return readAsync(
+ in,
+ tables,
+ resources,
+ metrics,
+ txManager,
+ notificationSender,
+ tsTracker,
+ noneOf(RequestOptions.class),
+ requestId,
+ reqToTxMap
+ )
.thenCompose(req ->
req.table().recordView().deleteAllExactAsync(req.tx(), req.tuples())
.thenApply(skippedTuples -> out -> {
writeTxMeta(out, tsTracker, clockService, req);
writeTuples(out, skippedTuples,
req.table().schemaView());
- }));
+ })
+ );
}
}
diff --git
a/modules/client-handler/src/main/java/org/apache/ignite/client/handler/requests/table/ClientTupleDeleteAllRequest.java
b/modules/client-handler/src/main/java/org/apache/ignite/client/handler/requests/table/ClientTupleDeleteAllRequest.java
index 68c5b8ecfbe..0e3cc2b9a94 100644
---
a/modules/client-handler/src/main/java/org/apache/ignite/client/handler/requests/table/ClientTupleDeleteAllRequest.java
+++
b/modules/client-handler/src/main/java/org/apache/ignite/client/handler/requests/table/ClientTupleDeleteAllRequest.java
@@ -21,7 +21,9 @@ import static java.util.EnumSet.of;
import static
org.apache.ignite.client.handler.requests.table.ClientTableCommon.writeTuples;
import static
org.apache.ignite.client.handler.requests.table.ClientTableCommon.writeTxMeta;
import static
org.apache.ignite.client.handler.requests.table.ClientTupleRequestBase.RequestOptions.KEY_ONLY;
+import static
org.apache.ignite.client.handler.requests.table.ClientTuplesRequestBase.readAsync;
+import java.util.Map;
import java.util.concurrent.CompletableFuture;
import org.apache.ignite.client.handler.ClientHandlerMetricSource;
import org.apache.ignite.client.handler.ClientResourceRegistry;
@@ -45,6 +47,8 @@ public class ClientTupleDeleteAllRequest {
* @param tables Ignite tables.
* @param resources Resource registry.
* @param txManager Ignite transactions.
+ * @param requestId Id of the request.
+ * @param reqToTxMap Tracker for first request of direct transactions.
* @return Future.
*/
public static CompletableFuture<ResponseWriter> process(
@@ -55,9 +59,11 @@ public class ClientTupleDeleteAllRequest {
TxManager txManager,
ClockService clockService,
NotificationSender notificationSender,
- HybridTimestampTracker tsTracker
+ HybridTimestampTracker tsTracker,
+ long requestId,
+ Map<Long, Long> reqToTxMap
) {
- return ClientTuplesRequestBase.readAsync(in, tables, resources,
metrics, txManager, notificationSender, tsTracker, of(KEY_ONLY))
+ return readAsync(in, tables, resources, metrics, txManager,
notificationSender, tsTracker, of(KEY_ONLY), requestId, reqToTxMap)
.thenCompose(req ->
req.table().recordView().deleteAllAsync(req.tx(), req.tuples())
.thenApply(skippedTuples -> out -> {
writeTxMeta(out, tsTracker, clockService, req);
diff --git
a/modules/client-handler/src/main/java/org/apache/ignite/client/handler/requests/table/ClientTupleDeleteExactRequest.java
b/modules/client-handler/src/main/java/org/apache/ignite/client/handler/requests/table/ClientTupleDeleteExactRequest.java
index 82233f81bf9..c95f09358bc 100644
---
a/modules/client-handler/src/main/java/org/apache/ignite/client/handler/requests/table/ClientTupleDeleteExactRequest.java
+++
b/modules/client-handler/src/main/java/org/apache/ignite/client/handler/requests/table/ClientTupleDeleteExactRequest.java
@@ -21,6 +21,7 @@ import static java.util.EnumSet.noneOf;
import static
org.apache.ignite.client.handler.requests.table.ClientTableCommon.writeTxMeta;
import static
org.apache.ignite.client.handler.requests.table.ClientTupleRequestBase.readAsync;
+import java.util.Map;
import java.util.concurrent.CompletableFuture;
import org.apache.ignite.client.handler.ClientHandlerMetricSource;
import org.apache.ignite.client.handler.ClientResourceRegistry;
@@ -44,6 +45,8 @@ public class ClientTupleDeleteExactRequest {
* @param tables Ignite tables.
* @param resources Resource registry.
* @param txManager Ignite transactions.
+ * @param requestId Id of the request.
+ * @param reqToTxMap Tracker for first request of direct transactions.
* @return Future.
*/
public static CompletableFuture<ResponseWriter> process(
@@ -54,14 +57,28 @@ public class ClientTupleDeleteExactRequest {
TxManager txManager,
ClockService clockService,
NotificationSender notificationSender,
- HybridTimestampTracker tsTracker
+ HybridTimestampTracker tsTracker,
+ long requestId,
+ Map<Long, Long> reqToTxMap
) {
- return readAsync(in, tables, resources, metrics, txManager,
notificationSender, tsTracker, noneOf(RequestOptions.class))
+ return readAsync(
+ in,
+ tables,
+ resources,
+ metrics,
+ txManager,
+ notificationSender,
+ tsTracker,
+ noneOf(RequestOptions.class),
+ requestId,
+ reqToTxMap
+ )
.thenCompose(req ->
req.table().recordView().deleteExactAsync(req.tx(), req.tuple())
- .thenApply(res -> out -> {
- writeTxMeta(out, tsTracker, clockService, req);
-
out.packInt(req.table().schemaView().lastKnownSchemaVersion());
- out.packBoolean(res);
- }));
+ .thenApply(res -> out -> {
+ writeTxMeta(out, tsTracker, clockService, req);
+
out.packInt(req.table().schemaView().lastKnownSchemaVersion());
+ out.packBoolean(res);
+ })
+ );
}
}
diff --git
a/modules/client-handler/src/main/java/org/apache/ignite/client/handler/requests/table/ClientTupleDeleteRequest.java
b/modules/client-handler/src/main/java/org/apache/ignite/client/handler/requests/table/ClientTupleDeleteRequest.java
index 339227e18e4..6cbf6115aeb 100644
---
a/modules/client-handler/src/main/java/org/apache/ignite/client/handler/requests/table/ClientTupleDeleteRequest.java
+++
b/modules/client-handler/src/main/java/org/apache/ignite/client/handler/requests/table/ClientTupleDeleteRequest.java
@@ -20,7 +20,9 @@ package org.apache.ignite.client.handler.requests.table;
import static java.util.EnumSet.of;
import static
org.apache.ignite.client.handler.requests.table.ClientTableCommon.writeTxMeta;
import static
org.apache.ignite.client.handler.requests.table.ClientTupleRequestBase.RequestOptions.KEY_ONLY;
+import static
org.apache.ignite.client.handler.requests.table.ClientTupleRequestBase.readAsync;
+import java.util.Map;
import java.util.concurrent.CompletableFuture;
import org.apache.ignite.client.handler.ClientHandlerMetricSource;
import org.apache.ignite.client.handler.ClientResourceRegistry;
@@ -43,6 +45,8 @@ public class ClientTupleDeleteRequest {
* @param tables Ignite tables.
* @param resources Resource registry.
* @param txManager Ignite transactions.
+ * @param requestId Id of the request.
+ * @param reqToTxMap Tracker for first request of direct transactions.
* @return Future.
*/
public static CompletableFuture<ResponseWriter> process(
@@ -53,9 +57,11 @@ public class ClientTupleDeleteRequest {
TxManager txManager,
ClockService clockService,
NotificationSender notificationSender,
- HybridTimestampTracker tsTracker
+ HybridTimestampTracker tsTracker,
+ long requestId,
+ Map<Long, Long> reqToTxMap
) {
- return ClientTupleRequestBase.readAsync(in, tables, resources,
metrics, txManager, notificationSender, tsTracker, of(KEY_ONLY))
+ return readAsync(in, tables, resources, metrics, txManager,
notificationSender, tsTracker, of(KEY_ONLY), requestId, reqToTxMap)
.thenCompose(req ->
req.table().recordView().deleteAsync(req.tx(), req.tuple())
.thenApply(res -> out -> {
writeTxMeta(out, tsTracker, clockService, req);
diff --git
a/modules/client-handler/src/main/java/org/apache/ignite/client/handler/requests/table/ClientTupleGetAllRequest.java
b/modules/client-handler/src/main/java/org/apache/ignite/client/handler/requests/table/ClientTupleGetAllRequest.java
index 199464b3385..ead2d85f276 100644
---
a/modules/client-handler/src/main/java/org/apache/ignite/client/handler/requests/table/ClientTupleGetAllRequest.java
+++
b/modules/client-handler/src/main/java/org/apache/ignite/client/handler/requests/table/ClientTupleGetAllRequest.java
@@ -24,6 +24,7 @@ import static
org.apache.ignite.client.handler.requests.table.ClientTupleRequest
import static
org.apache.ignite.client.handler.requests.table.ClientTupleRequestBase.RequestOptions.KEY_ONLY;
import java.util.EnumSet;
+import java.util.Map;
import java.util.concurrent.CompletableFuture;
import org.apache.ignite.client.handler.ClientHandlerMetricSource;
import org.apache.ignite.client.handler.ClientResourceRegistry;
@@ -49,6 +50,8 @@ public class ClientTupleGetAllRequest {
* @param txManager Transaction manager.
* @param clockService Clock service.
* @param tsTracker Tracker.
+ * @param requestId Id of the request.
+ * @param reqToTxMap Tracker for first request of direct transactions.
* @param supportsOptions {@code True} if supports tx options.
* @return Future.
*/
@@ -60,11 +63,13 @@ public class ClientTupleGetAllRequest {
TxManager txManager,
ClockService clockService,
HybridTimestampTracker tsTracker,
+ long requestId,
+ Map<Long, Long> reqToTxMap,
boolean supportsOptions
) {
EnumSet<RequestOptions> options = supportsOptions ? of(KEY_ONLY,
HAS_OPTIONS) : of(KEY_ONLY);
- return ClientTuplesRequestBase.readAsync(in, tables, resources,
metrics, txManager, null, tsTracker, options)
+ return ClientTuplesRequestBase.readAsync(in, tables, resources,
metrics, txManager, null, tsTracker, options, requestId, reqToTxMap)
.thenCompose(req -> {
return req.table().recordView().getAllAsync(req.tx(),
req.tuples())
.thenApply(resTuples -> out -> {
diff --git
a/modules/client-handler/src/main/java/org/apache/ignite/client/handler/requests/table/ClientTupleGetAndDeleteRequest.java
b/modules/client-handler/src/main/java/org/apache/ignite/client/handler/requests/table/ClientTupleGetAndDeleteRequest.java
index 5133a14bcf5..795a0435fdc 100644
---
a/modules/client-handler/src/main/java/org/apache/ignite/client/handler/requests/table/ClientTupleGetAndDeleteRequest.java
+++
b/modules/client-handler/src/main/java/org/apache/ignite/client/handler/requests/table/ClientTupleGetAndDeleteRequest.java
@@ -20,7 +20,9 @@ package org.apache.ignite.client.handler.requests.table;
import static java.util.EnumSet.of;
import static
org.apache.ignite.client.handler.requests.table.ClientTableCommon.writeTxMeta;
import static
org.apache.ignite.client.handler.requests.table.ClientTupleRequestBase.RequestOptions.KEY_ONLY;
+import static
org.apache.ignite.client.handler.requests.table.ClientTupleRequestBase.readAsync;
+import java.util.Map;
import java.util.concurrent.CompletableFuture;
import org.apache.ignite.client.handler.ClientHandlerMetricSource;
import org.apache.ignite.client.handler.ClientResourceRegistry;
@@ -44,6 +46,8 @@ public class ClientTupleGetAndDeleteRequest {
* @param tables Ignite tables.
* @param resources Resource registry.
* @param txManager Ignite transactions.
+ * @param requestId Id of the request.
+ * @param reqToTxMap Tracker for first request of direct transactions.
* @return Future.
*/
public static CompletableFuture<ResponseWriter> process(
@@ -54,9 +58,11 @@ public class ClientTupleGetAndDeleteRequest {
TxManager txManager,
ClockService clockService,
NotificationSender notificationSender,
- HybridTimestampTracker tsTracker
+ HybridTimestampTracker tsTracker,
+ long requestId,
+ Map<Long, Long> reqToTxMap
) {
- return ClientTupleRequestBase.readAsync(in, tables, resources,
metrics, txManager, notificationSender, tsTracker, of(KEY_ONLY))
+ return readAsync(in, tables, resources, metrics, txManager,
notificationSender, tsTracker, of(KEY_ONLY), requestId, reqToTxMap)
.thenCompose(req ->
req.table().recordView().getAndDeleteAsync(req.tx(), req.tuple())
.thenApply(resTuple -> out -> {
writeTxMeta(out, tsTracker, clockService, req);
diff --git
a/modules/client-handler/src/main/java/org/apache/ignite/client/handler/requests/table/ClientTupleGetAndReplaceRequest.java
b/modules/client-handler/src/main/java/org/apache/ignite/client/handler/requests/table/ClientTupleGetAndReplaceRequest.java
index 6fb485021d4..36dc64c8f1c 100644
---
a/modules/client-handler/src/main/java/org/apache/ignite/client/handler/requests/table/ClientTupleGetAndReplaceRequest.java
+++
b/modules/client-handler/src/main/java/org/apache/ignite/client/handler/requests/table/ClientTupleGetAndReplaceRequest.java
@@ -21,6 +21,7 @@ import static java.util.EnumSet.noneOf;
import static
org.apache.ignite.client.handler.requests.table.ClientTableCommon.writeTxMeta;
import static
org.apache.ignite.client.handler.requests.table.ClientTupleRequestBase.readAsync;
+import java.util.Map;
import java.util.concurrent.CompletableFuture;
import org.apache.ignite.client.handler.ClientHandlerMetricSource;
import org.apache.ignite.client.handler.ClientResourceRegistry;
@@ -45,6 +46,8 @@ public class ClientTupleGetAndReplaceRequest {
* @param tables Ignite tables.
* @param resources Resource registry.
* @param txManager Ignite transactions.
+ * @param requestId Id of the request.
+ * @param reqToTxMap Tracker for first request of direct transactions.
* @return Future.
*/
public static CompletableFuture<ResponseWriter> process(
@@ -55,13 +58,27 @@ public class ClientTupleGetAndReplaceRequest {
TxManager txManager,
ClockService clockService,
NotificationSender notificationSender,
- HybridTimestampTracker tsTracker
+ HybridTimestampTracker tsTracker,
+ long requestId,
+ Map<Long, Long> reqToTxMap
) {
- return readAsync(in, tables, resources, metrics, txManager,
notificationSender, tsTracker, noneOf(RequestOptions.class))
+ return readAsync(
+ in,
+ tables,
+ resources,
+ metrics,
+ txManager,
+ notificationSender,
+ tsTracker,
+ noneOf(RequestOptions.class),
+ requestId,
+ reqToTxMap
+ )
.thenCompose(req ->
req.table().recordView().getAndReplaceAsync(req.tx(), req.tuple())
.thenApply(resTuple -> out -> {
writeTxMeta(out, tsTracker, clockService, req);
ClientTableCommon.writeTupleOrNil(out, resTuple,
TuplePart.KEY_AND_VAL, req.table().schemaView());
- }));
+ })
+ );
}
}
diff --git
a/modules/client-handler/src/main/java/org/apache/ignite/client/handler/requests/table/ClientTupleGetAndUpsertRequest.java
b/modules/client-handler/src/main/java/org/apache/ignite/client/handler/requests/table/ClientTupleGetAndUpsertRequest.java
index d8f3420df2a..620e3191580 100644
---
a/modules/client-handler/src/main/java/org/apache/ignite/client/handler/requests/table/ClientTupleGetAndUpsertRequest.java
+++
b/modules/client-handler/src/main/java/org/apache/ignite/client/handler/requests/table/ClientTupleGetAndUpsertRequest.java
@@ -21,6 +21,7 @@ import static java.util.EnumSet.noneOf;
import static
org.apache.ignite.client.handler.requests.table.ClientTableCommon.writeTxMeta;
import static
org.apache.ignite.client.handler.requests.table.ClientTupleRequestBase.readAsync;
+import java.util.Map;
import java.util.concurrent.CompletableFuture;
import org.apache.ignite.client.handler.ClientHandlerMetricSource;
import org.apache.ignite.client.handler.ClientResourceRegistry;
@@ -45,6 +46,8 @@ public class ClientTupleGetAndUpsertRequest {
* @param tables Ignite tables.
* @param resources Resource registry.
* @param txManager Ignite transactions.
+ * @param requestId Id of the request.
+ * @param reqToTxMap Tracker for first request of direct transactions.
* @return Future.
*/
public static CompletableFuture<ResponseWriter> process(
@@ -55,13 +58,27 @@ public class ClientTupleGetAndUpsertRequest {
TxManager txManager,
ClockService clockService,
NotificationSender notificationSender,
- HybridTimestampTracker tsTracker
+ HybridTimestampTracker tsTracker,
+ long requestId,
+ Map<Long, Long> reqToTxMap
) {
- return readAsync(in, tables, resources, metrics, txManager,
notificationSender, tsTracker, noneOf(RequestOptions.class))
+ return readAsync(
+ in,
+ tables,
+ resources,
+ metrics,
+ txManager,
+ notificationSender,
+ tsTracker,
+ noneOf(RequestOptions.class),
+ requestId,
+ reqToTxMap
+ )
.thenCompose(req ->
req.table().recordView().getAndUpsertAsync(req.tx(), req.tuple())
.thenApply(resTuple -> out -> {
writeTxMeta(out, tsTracker, clockService, req);
ClientTableCommon.writeTupleOrNil(out, resTuple,
TuplePart.KEY_AND_VAL, req.table().schemaView());
- }));
+ })
+ );
}
}
diff --git
a/modules/client-handler/src/main/java/org/apache/ignite/client/handler/requests/table/ClientTupleGetRequest.java
b/modules/client-handler/src/main/java/org/apache/ignite/client/handler/requests/table/ClientTupleGetRequest.java
index 48fd2ca0062..2d71d2cd828 100644
---
a/modules/client-handler/src/main/java/org/apache/ignite/client/handler/requests/table/ClientTupleGetRequest.java
+++
b/modules/client-handler/src/main/java/org/apache/ignite/client/handler/requests/table/ClientTupleGetRequest.java
@@ -21,7 +21,9 @@ import static java.util.EnumSet.of;
import static
org.apache.ignite.client.handler.requests.table.ClientTableCommon.writeTxMeta;
import static
org.apache.ignite.client.handler.requests.table.ClientTupleRequestBase.RequestOptions.KEY_ONLY;
import static
org.apache.ignite.client.handler.requests.table.ClientTupleRequestBase.RequestOptions.READ_ONLY;
+import static
org.apache.ignite.client.handler.requests.table.ClientTupleRequestBase.readAsync;
+import java.util.Map;
import java.util.concurrent.CompletableFuture;
import org.apache.ignite.client.handler.ClientHandlerMetricSource;
import org.apache.ignite.client.handler.ClientResourceRegistry;
@@ -44,6 +46,8 @@ public class ClientTupleGetRequest {
* @param tables Ignite tables.
* @param resources Resource registry.
* @param clockService Clock service.
+ * @param requestId Id of the request.
+ * @param reqToTxMap Tracker for first request of direct transactions.
* @return Future.
*/
public static CompletableFuture<ResponseWriter> process(
@@ -53,9 +57,11 @@ public class ClientTupleGetRequest {
ClientHandlerMetricSource metrics,
TxManager txManager,
ClockService clockService,
- HybridTimestampTracker tsTracker
+ HybridTimestampTracker tsTracker,
+ long requestId,
+ Map<Long, Long> reqToTxMap
) {
- return ClientTupleRequestBase.readAsync(in, tables, resources,
metrics, txManager, null, tsTracker, of(READ_ONLY, KEY_ONLY))
+ return readAsync(in, tables, resources, metrics, txManager, null,
tsTracker, of(READ_ONLY, KEY_ONLY), requestId, reqToTxMap)
.thenCompose(req ->
req.table().recordView().getAsync(req.tx(), req.tuple())
.thenApply(res -> out -> {
writeTxMeta(out, tsTracker, clockService, req);
diff --git
a/modules/client-handler/src/main/java/org/apache/ignite/client/handler/requests/table/ClientTupleInsertAllRequest.java
b/modules/client-handler/src/main/java/org/apache/ignite/client/handler/requests/table/ClientTupleInsertAllRequest.java
index ecc68421db5..b5fc7d80f40 100644
---
a/modules/client-handler/src/main/java/org/apache/ignite/client/handler/requests/table/ClientTupleInsertAllRequest.java
+++
b/modules/client-handler/src/main/java/org/apache/ignite/client/handler/requests/table/ClientTupleInsertAllRequest.java
@@ -22,6 +22,7 @@ import static
org.apache.ignite.client.handler.requests.table.ClientTableCommon.
import static
org.apache.ignite.client.handler.requests.table.ClientTableCommon.writeTxMeta;
import static
org.apache.ignite.client.handler.requests.table.ClientTuplesRequestBase.readAsync;
+import java.util.Map;
import java.util.concurrent.CompletableFuture;
import org.apache.ignite.client.handler.ClientHandlerMetricSource;
import org.apache.ignite.client.handler.ClientResourceRegistry;
@@ -47,6 +48,8 @@ public class ClientTupleInsertAllRequest {
* @param txManager Ignite transactions.
* @param clockService Clock service.
* @param notificationSender Notification sender.
+ * @param requestId Id of the request.
+ * @param reqToTxMap Tracker for first request of direct transactions.
* @return Future.
*/
public static CompletableFuture<ResponseWriter> process(
@@ -57,13 +60,26 @@ public class ClientTupleInsertAllRequest {
TxManager txManager,
ClockService clockService,
NotificationSender notificationSender,
- HybridTimestampTracker tsTracker
+ HybridTimestampTracker tsTracker,
+ long requestId,
+ Map<Long, Long> reqToTxMap
) {
- return readAsync(in, tables, resources, metrics, txManager,
notificationSender, tsTracker, noneOf(RequestOptions.class))
+ return readAsync(in,
+ tables,
+ resources,
+ metrics,
+ txManager,
+ notificationSender,
+ tsTracker,
+ noneOf(RequestOptions.class),
+ requestId,
+ reqToTxMap
+ )
.thenCompose(req ->
req.table().recordView().insertAllAsync(req.tx(), req.tuples())
.thenApply(skippedTuples -> out -> {
writeTxMeta(out, tsTracker, clockService, req);
writeTuples(out, skippedTuples,
req.table().schemaView());
- }));
+ })
+ );
}
}
diff --git
a/modules/client-handler/src/main/java/org/apache/ignite/client/handler/requests/table/ClientTupleInsertRequest.java
b/modules/client-handler/src/main/java/org/apache/ignite/client/handler/requests/table/ClientTupleInsertRequest.java
index c8bfe6b60e2..1dada6202e2 100644
---
a/modules/client-handler/src/main/java/org/apache/ignite/client/handler/requests/table/ClientTupleInsertRequest.java
+++
b/modules/client-handler/src/main/java/org/apache/ignite/client/handler/requests/table/ClientTupleInsertRequest.java
@@ -21,6 +21,7 @@ import static java.util.EnumSet.noneOf;
import static
org.apache.ignite.client.handler.requests.table.ClientTableCommon.writeTxMeta;
import static
org.apache.ignite.client.handler.requests.table.ClientTupleRequestBase.readAsync;
+import java.util.Map;
import java.util.concurrent.CompletableFuture;
import org.apache.ignite.client.handler.ClientHandlerMetricSource;
import org.apache.ignite.client.handler.ClientResourceRegistry;
@@ -46,6 +47,8 @@ public class ClientTupleInsertRequest {
* @param txManager Ignite transactions.
* @param clockService Clock service.
* @param notificationSender Notification sender.
+ * @param requestId Id of the request.
+ * @param reqToTxMap Tracker for first request of direct transactions.
* @return Future.
*/
public static CompletableFuture<ResponseWriter> process(
@@ -56,14 +59,28 @@ public class ClientTupleInsertRequest {
TxManager txManager,
ClockService clockService,
NotificationSender notificationSender,
- HybridTimestampTracker tsTracker
+ HybridTimestampTracker tsTracker,
+ long requestId,
+ Map<Long, Long> reqToTxMap
) {
- return readAsync(in, tables, resources, metrics, txManager,
notificationSender, tsTracker, noneOf(RequestOptions.class))
+ return readAsync(
+ in,
+ tables,
+ resources,
+ metrics,
+ txManager,
+ notificationSender,
+ tsTracker,
+ noneOf(RequestOptions.class),
+ requestId,
+ reqToTxMap
+ )
.thenCompose(req ->
req.table().recordView().insertAsync(req.tx(), req.tuple())
.thenApply(res -> out -> {
writeTxMeta(out, tsTracker, clockService, req);
out.packInt(req.table().schemaView().lastKnownSchemaVersion());
out.packBoolean(res);
- }));
+ })
+ );
}
}
diff --git
a/modules/client-handler/src/main/java/org/apache/ignite/client/handler/requests/table/ClientTupleReplaceExactRequest.java
b/modules/client-handler/src/main/java/org/apache/ignite/client/handler/requests/table/ClientTupleReplaceExactRequest.java
index ecf2c300eac..a94f77b3db8 100644
---
a/modules/client-handler/src/main/java/org/apache/ignite/client/handler/requests/table/ClientTupleReplaceExactRequest.java
+++
b/modules/client-handler/src/main/java/org/apache/ignite/client/handler/requests/table/ClientTupleReplaceExactRequest.java
@@ -21,6 +21,7 @@ import static java.util.EnumSet.of;
import static
org.apache.ignite.client.handler.requests.table.ClientTableCommon.writeTxMeta;
import static
org.apache.ignite.client.handler.requests.table.ClientTupleRequestBase.RequestOptions.READ_SECOND_TUPLE;
+import java.util.Map;
import java.util.concurrent.CompletableFuture;
import org.apache.ignite.client.handler.ClientHandlerMetricSource;
import org.apache.ignite.client.handler.ClientResourceRegistry;
@@ -43,6 +44,8 @@ public class ClientTupleReplaceExactRequest {
* @param tables Ignite tables.
* @param resources Resource registry.
* @param txManager Ignite transactions.
+ * @param requestId Id of the request.
+ * @param reqToTxMap Tracker for first request of direct transactions.
* @return Future.
*/
public static CompletableFuture<ResponseWriter> process(
@@ -53,7 +56,9 @@ public class ClientTupleReplaceExactRequest {
TxManager txManager,
ClockService clockService,
NotificationSender notificationSender,
- HybridTimestampTracker tsTracker
+ HybridTimestampTracker tsTracker,
+ long requestId,
+ Map<Long, Long> reqToTxMap
) {
return ClientTupleRequestBase.readAsync(
in,
@@ -63,7 +68,9 @@ public class ClientTupleReplaceExactRequest {
txManager,
notificationSender,
tsTracker,
- of(READ_SECOND_TUPLE)
+ of(READ_SECOND_TUPLE),
+ requestId,
+ reqToTxMap
)
.thenCompose(req ->
req.table().recordView().replaceExactAsync(req.tx(), req.tuple(), req.tuple2())
.thenApply(res -> out -> {
diff --git
a/modules/client-handler/src/main/java/org/apache/ignite/client/handler/requests/table/ClientTupleReplaceRequest.java
b/modules/client-handler/src/main/java/org/apache/ignite/client/handler/requests/table/ClientTupleReplaceRequest.java
index 86d6d58078d..ed5f141342e 100644
---
a/modules/client-handler/src/main/java/org/apache/ignite/client/handler/requests/table/ClientTupleReplaceRequest.java
+++
b/modules/client-handler/src/main/java/org/apache/ignite/client/handler/requests/table/ClientTupleReplaceRequest.java
@@ -21,6 +21,7 @@ import static java.util.EnumSet.noneOf;
import static
org.apache.ignite.client.handler.requests.table.ClientTableCommon.writeTxMeta;
import static
org.apache.ignite.client.handler.requests.table.ClientTupleRequestBase.readAsync;
+import java.util.Map;
import java.util.concurrent.CompletableFuture;
import org.apache.ignite.client.handler.ClientHandlerMetricSource;
import org.apache.ignite.client.handler.ClientResourceRegistry;
@@ -44,6 +45,8 @@ public class ClientTupleReplaceRequest {
* @param tables Ignite tables.
* @param resources Resource registry.
* @param txManager Ignite transactions.
+ * @param requestId Id of the request.
+ * @param reqToTxMap Tracker for first request of direct transactions.
* @return Future.
*/
public static CompletableFuture<ResponseWriter> process(
@@ -54,14 +57,28 @@ public class ClientTupleReplaceRequest {
TxManager txManager,
ClockService clockService,
NotificationSender notificationSender,
- HybridTimestampTracker tsTracker
+ HybridTimestampTracker tsTracker,
+ long requestId,
+ Map<Long, Long> reqToTxMap
) {
- return readAsync(in, tables, resources, metrics, txManager,
notificationSender, tsTracker, noneOf(RequestOptions.class))
+ return readAsync(
+ in,
+ tables,
+ resources,
+ metrics,
+ txManager,
+ notificationSender,
+ tsTracker,
+ noneOf(RequestOptions.class),
+ requestId,
+ reqToTxMap
+ )
.thenCompose(req ->
req.table().recordView().replaceAsync(req.tx(), req.tuple())
.thenApply(res -> out -> {
writeTxMeta(out, tsTracker, clockService, req);
out.packInt(req.table().schemaView().lastKnownSchemaVersion());
out.packBoolean(res);
- }));
+ })
+ );
}
}
diff --git
a/modules/client-handler/src/main/java/org/apache/ignite/client/handler/requests/table/ClientTupleRequestBase.java
b/modules/client-handler/src/main/java/org/apache/ignite/client/handler/requests/table/ClientTupleRequestBase.java
index b2cdc608a87..66fd12c134d 100644
---
a/modules/client-handler/src/main/java/org/apache/ignite/client/handler/requests/table/ClientTupleRequestBase.java
+++
b/modules/client-handler/src/main/java/org/apache/ignite/client/handler/requests/table/ClientTupleRequestBase.java
@@ -25,6 +25,7 @@ import static
org.apache.ignite.client.handler.requests.table.ClientTupleRequest
import java.util.BitSet;
import java.util.EnumSet;
+import java.util.Map;
import java.util.concurrent.CompletableFuture;
import org.apache.ignite.client.handler.ClientHandlerMetricSource;
import org.apache.ignite.client.handler.ClientResourceRegistry;
@@ -93,7 +94,9 @@ class ClientTupleRequestBase {
TxManager txManager,
@Nullable NotificationSender notificationSender,
HybridTimestampTracker tsTracker,
- EnumSet<RequestOptions> options
+ EnumSet<RequestOptions> options,
+ long requestId,
+ Map<Long, Long> reqToTxMap
) {
int tableId = in.unpackInt();
@@ -108,7 +111,9 @@ class ClientTupleRequestBase {
tables,
options,
notificationSender,
- resIdHolder
+ resIdHolder,
+ requestId,
+ reqToTxMap
);
int schemaId = in.unpackInt();
diff --git
a/modules/client-handler/src/main/java/org/apache/ignite/client/handler/requests/table/ClientTupleUpsertAllRequest.java
b/modules/client-handler/src/main/java/org/apache/ignite/client/handler/requests/table/ClientTupleUpsertAllRequest.java
index 1d6604e0290..a6f4d14beef 100644
---
a/modules/client-handler/src/main/java/org/apache/ignite/client/handler/requests/table/ClientTupleUpsertAllRequest.java
+++
b/modules/client-handler/src/main/java/org/apache/ignite/client/handler/requests/table/ClientTupleUpsertAllRequest.java
@@ -21,6 +21,7 @@ import static java.util.EnumSet.noneOf;
import static
org.apache.ignite.client.handler.requests.table.ClientTableCommon.writeTxMeta;
import static
org.apache.ignite.client.handler.requests.table.ClientTuplesRequestBase.readAsync;
+import java.util.Map;
import java.util.concurrent.CompletableFuture;
import org.apache.ignite.client.handler.ClientHandlerMetricSource;
import org.apache.ignite.client.handler.ClientResourceRegistry;
@@ -44,6 +45,8 @@ public class ClientTupleUpsertAllRequest {
* @param tables Ignite tables.
* @param resources Resource registry.
* @param txManager Ignite transactions.
+ * @param requestId Id of the request.
+ * @param reqToTxMap Tracker for first request of direct transactions.
* @return Future.
*/
public static CompletableFuture<ResponseWriter> process(
@@ -54,13 +57,27 @@ public class ClientTupleUpsertAllRequest {
TxManager txManager,
ClockService clockService,
NotificationSender notificationSender,
- HybridTimestampTracker tsTracker
+ HybridTimestampTracker tsTracker,
+ long requestId,
+ Map<Long, Long> reqToTxMap
) {
- return readAsync(in, tables, resources, metrics, txManager,
notificationSender, tsTracker, noneOf(RequestOptions.class))
+ return readAsync(
+ in,
+ tables,
+ resources,
+ metrics,
+ txManager,
+ notificationSender,
+ tsTracker,
+ noneOf(RequestOptions.class),
+ requestId,
+ reqToTxMap
+ )
.thenCompose(req ->
req.table().recordView().upsertAllAsync(req.tx(), req.tuples())
.thenApply(v -> out -> {
writeTxMeta(out, tsTracker, clockService, req);
out.packInt(req.table().schemaView().lastKnownSchemaVersion());
- }));
+ })
+ );
}
}
diff --git
a/modules/client-handler/src/main/java/org/apache/ignite/client/handler/requests/table/ClientTupleUpsertRequest.java
b/modules/client-handler/src/main/java/org/apache/ignite/client/handler/requests/table/ClientTupleUpsertRequest.java
index 58efb8accc8..d549ec8ee82 100644
---
a/modules/client-handler/src/main/java/org/apache/ignite/client/handler/requests/table/ClientTupleUpsertRequest.java
+++
b/modules/client-handler/src/main/java/org/apache/ignite/client/handler/requests/table/ClientTupleUpsertRequest.java
@@ -21,6 +21,7 @@ import static java.util.EnumSet.noneOf;
import static
org.apache.ignite.client.handler.requests.table.ClientTableCommon.writeTxMeta;
import static
org.apache.ignite.client.handler.requests.table.ClientTupleRequestBase.readAsync;
+import java.util.Map;
import java.util.concurrent.CompletableFuture;
import org.apache.ignite.client.handler.ClientHandlerMetricSource;
import org.apache.ignite.client.handler.ClientResourceRegistry;
@@ -37,6 +38,7 @@ import org.apache.ignite.table.IgniteTables;
* Client tuple upsert request.
*/
public class ClientTupleUpsertRequest {
+
/**
* Processes the request.
*
@@ -46,6 +48,8 @@ public class ClientTupleUpsertRequest {
* @param txManager Ignite transactions.
* @param clockService Clock service.
* @param notificationSender Notification sender.
+ * @param requestId Id of the request.
+ * @param reqToTxMap Tracker for first request of direct transactions.
* @return Future.
*/
public static CompletableFuture<ResponseWriter> process(
@@ -56,13 +60,27 @@ public class ClientTupleUpsertRequest {
TxManager txManager,
ClockService clockService,
NotificationSender notificationSender,
- HybridTimestampTracker tsTracker
+ HybridTimestampTracker tsTracker,
+ long requestId,
+ Map<Long, Long> reqToTxMap
) {
- return readAsync(in, tables, resources, metrics, txManager,
notificationSender, tsTracker, noneOf(RequestOptions.class))
+ return readAsync(
+ in,
+ tables,
+ resources,
+ metrics,
+ txManager,
+ notificationSender,
+ tsTracker,
+ noneOf(RequestOptions.class),
+ requestId,
+ reqToTxMap
+ )
.thenCompose(req ->
req.table().recordView().upsertAsync(req.tx(), req.tuple())
.thenApply(v -> out -> {
writeTxMeta(out, tsTracker, clockService, req);
out.packInt(req.table().schemaView().lastKnownSchemaVersion());
- }));
+ })
+ );
}
}
diff --git
a/modules/client-handler/src/main/java/org/apache/ignite/client/handler/requests/table/ClientTuplesRequestBase.java
b/modules/client-handler/src/main/java/org/apache/ignite/client/handler/requests/table/ClientTuplesRequestBase.java
index 43961bcd6d3..05975e62fcc 100644
---
a/modules/client-handler/src/main/java/org/apache/ignite/client/handler/requests/table/ClientTuplesRequestBase.java
+++
b/modules/client-handler/src/main/java/org/apache/ignite/client/handler/requests/table/ClientTuplesRequestBase.java
@@ -26,6 +26,7 @@ import java.util.ArrayList;
import java.util.BitSet;
import java.util.EnumSet;
import java.util.List;
+import java.util.Map;
import java.util.concurrent.CompletableFuture;
import org.apache.ignite.client.handler.ClientHandlerMetricSource;
import org.apache.ignite.client.handler.ClientResourceRegistry;
@@ -77,14 +78,27 @@ class ClientTuplesRequestBase {
TxManager txManager,
@Nullable NotificationSender notificationSender,
HybridTimestampTracker tsTracker,
- EnumSet<RequestOptions> options
+ EnumSet<RequestOptions> options,
+ long requestId,
+ Map<Long, Long> reqToTxMap
) {
int tableId = in.unpackInt();
long[] resIdHolder = {0};
- CompletableFuture<InternalTransaction> txFut =
- readOrStartImplicitTx(in, tsTracker, resources, metrics,
txManager, tables, options, notificationSender, resIdHolder);
+ CompletableFuture<InternalTransaction> txFut = readOrStartImplicitTx(
+ in,
+ tsTracker,
+ resources,
+ metrics,
+ txManager,
+ tables,
+ options,
+ notificationSender,
+ resIdHolder,
+ requestId,
+ reqToTxMap
+ );
int schemaId = in.unpackInt();
diff --git
a/modules/client-handler/src/main/java/org/apache/ignite/client/handler/requests/table/DirectTransactionWithFirstRequest.java
b/modules/client-handler/src/main/java/org/apache/ignite/client/handler/requests/table/DirectTransactionWithFirstRequest.java
new file mode 100644
index 00000000000..e5482e1ea56
--- /dev/null
+++
b/modules/client-handler/src/main/java/org/apache/ignite/client/handler/requests/table/DirectTransactionWithFirstRequest.java
@@ -0,0 +1,192 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.client.handler.requests.table;
+
+import java.util.Map;
+import java.util.UUID;
+import java.util.concurrent.CompletableFuture;
+import org.apache.ignite.internal.hlc.HybridTimestamp;
+import org.apache.ignite.internal.replicator.ZonePartitionId;
+import org.apache.ignite.internal.tx.InternalTransaction;
+import org.apache.ignite.internal.tx.PendingTxPartitionEnlistment;
+import org.apache.ignite.internal.tx.TxState;
+import org.apache.ignite.internal.wrapper.Wrapper;
+import org.apache.ignite.tx.TransactionException;
+import org.jetbrains.annotations.Nullable;
+
+class DirectTransactionWithFirstRequest implements InternalTransaction,
Wrapper {
+ private final InternalTransaction base;
+
+ // We could also just accept a lambda.
+ private final Map<Long, Long> reqToTxMap;
+
+ private final long firstReqId;
+
+ DirectTransactionWithFirstRequest(InternalTransaction base, Map<Long,
Long> reqToTxMap, long firstReqId) {
+ this.base = base;
+ this.reqToTxMap = reqToTxMap;
+ this.firstReqId = firstReqId;
+ }
+
+ @Override
+ public UUID id() {
+ return base.id();
+ }
+
+ @Override
+ public PendingTxPartitionEnlistment enlistedPartition(ZonePartitionId
replicationGroupId) {
+ return base.enlistedPartition(replicationGroupId);
+ }
+
+ @Override
+ public TxState state() {
+ return base.state();
+ }
+
+ @Override
+ public boolean assignCommitPartition(ZonePartitionId commitPartitionId) {
+ return base.assignCommitPartition(commitPartitionId);
+ }
+
+ @Override
+ public ZonePartitionId commitPartition() {
+ return base.commitPartition();
+ }
+
+ @Override
+ public void enlist(ZonePartitionId replicationGroupId, int tableId, String
primaryNodeConsistentId, long consistencyToken) {
+ base.enlist(replicationGroupId, tableId, primaryNodeConsistentId,
consistencyToken);
+ }
+
+ @Override
+ public @Nullable HybridTimestamp readTimestamp() {
+ return base.readTimestamp();
+ }
+
+ @Override
+ public HybridTimestamp schemaTimestamp() {
+ return base.schemaTimestamp();
+ }
+
+ @Override
+ public UUID coordinatorId() {
+ return base.coordinatorId();
+ }
+
+ @Override
+ public boolean implicit() {
+ return base.implicit();
+ }
+
+ @Override
+ public boolean remote() {
+ return base.remote();
+ }
+
+ @Override
+ public boolean remoteOnCoordinator() {
+ return base.remoteOnCoordinator();
+ }
+
+ @Override
+ public CompletableFuture<Void> finish(boolean commit, @Nullable
HybridTimestamp executionTimestamp, boolean full,
+ @Nullable Throwable finishReason) {
+ return base.finish(commit, executionTimestamp, full,
finishReason).whenComplete((v, err) -> removeMapping());
+ }
+
+ @Override
+ public boolean isFinishingOrFinished() {
+ return base.isFinishingOrFinished();
+ }
+
+ @Override
+ public long getTimeout() {
+ return base.getTimeout();
+ }
+
+ @Override
+ public CompletableFuture<Void> kill() {
+ return base.kill().whenComplete((v, err) -> removeMapping());
+ }
+
+ @Override
+ public CompletableFuture<Void> rollbackWithExceptionAsync(Throwable
throwable) {
+ return base.rollbackWithExceptionAsync(throwable).whenComplete((v,
err) -> removeMapping());
+ }
+
+ @Override
+ public boolean isRolledBackWithTimeoutExceeded() {
+ return base.isRolledBackWithTimeoutExceeded();
+ }
+
+ @Override
+ public void processDelayedAck(Object val, @Nullable Throwable err) {
+ base.processDelayedAck(val, err);
+ }
+
+ @Override
+ public void commit() throws TransactionException {
+ try {
+ base.commit();
+ } finally {
+ removeMapping();
+ }
+ }
+
+ @Override
+ public CompletableFuture<Void> commitAsync() {
+ return base.commitAsync().whenComplete((v, err) -> removeMapping());
+ }
+
+ @Override
+ public void rollback() throws TransactionException {
+ try {
+ base.rollback();
+ } finally {
+ removeMapping();
+ }
+ }
+
+ @Override
+ public CompletableFuture<Void> rollbackAsync() {
+ return base.rollbackAsync().whenComplete((v, err) -> removeMapping());
+ }
+
+ @Override
+ public boolean isReadOnly() {
+ return base.isReadOnly();
+ }
+
+ @Override
+ public RuntimeException enlistFailedException() {
+ return base.enlistFailedException();
+ }
+
+ public InternalTransaction base() {
+ return base;
+ }
+
+ @Override
+ public <T> T unwrap(Class<T> classToUnwrap) {
+ return (T) base;
+ }
+
+ private void removeMapping() {
+ reqToTxMap.remove(firstReqId);
+ }
+}
diff --git
a/modules/client-handler/src/main/java/org/apache/ignite/client/handler/requests/tx/ClientTransactionCommitRequest.java
b/modules/client-handler/src/main/java/org/apache/ignite/client/handler/requests/tx/ClientTransactionCommitRequest.java
index df377d133ae..d2d26703256 100644
---
a/modules/client-handler/src/main/java/org/apache/ignite/client/handler/requests/tx/ClientTransactionCommitRequest.java
+++
b/modules/client-handler/src/main/java/org/apache/ignite/client/handler/requests/tx/ClientTransactionCommitRequest.java
@@ -38,6 +38,7 @@ import org.apache.ignite.internal.tx.InternalTransaction;
import org.apache.ignite.internal.tx.PendingTxPartitionEnlistment;
import org.apache.ignite.internal.tx.impl.ReadWriteTransactionImpl;
import org.apache.ignite.internal.util.ExceptionUtils;
+import org.apache.ignite.internal.wrapper.Wrappers;
import org.apache.ignite.tx.TransactionException;
/**
@@ -89,7 +90,7 @@ public class ClientTransactionCommitRequest {
// Update causality. Used to assign commit timestamp after all
enlistments.
clockService.updateClock(HybridTimestamp.hybridTimestamp(causality));
- ReadWriteTransactionImpl tx0 = (ReadWriteTransactionImpl) tx;
+ ReadWriteTransactionImpl tx0 = Wrappers.unwrap(tx,
ReadWriteTransactionImpl.class);
// Enforce cleanup.
tx0.noRemoteWrites(sendRemoteWritesFlag && in.unpackBoolean());
diff --git
a/modules/client-handler/src/main/java/org/apache/ignite/client/handler/requests/tx/ClientTransactionRollbackRequest.java
b/modules/client-handler/src/main/java/org/apache/ignite/client/handler/requests/tx/ClientTransactionRollbackRequest.java
index c6492dfa99a..9fee94a501a 100644
---
a/modules/client-handler/src/main/java/org/apache/ignite/client/handler/requests/tx/ClientTransactionRollbackRequest.java
+++
b/modules/client-handler/src/main/java/org/apache/ignite/client/handler/requests/tx/ClientTransactionRollbackRequest.java
@@ -19,6 +19,7 @@ package org.apache.ignite.client.handler.requests.tx;
import static
org.apache.ignite.client.handler.requests.tx.ClientTransactionCommitRequest.merge;
+import java.util.Map;
import java.util.concurrent.CompletableFuture;
import org.apache.ignite.client.handler.ClientHandlerMetricSource;
import org.apache.ignite.client.handler.ClientResourceRegistry;
@@ -29,6 +30,9 @@ import org.apache.ignite.internal.table.IgniteTablesInternal;
import org.apache.ignite.internal.table.TableViewInternal;
import org.apache.ignite.internal.tx.InternalTransaction;
import org.apache.ignite.internal.tx.impl.ReadWriteTransactionImpl;
+import org.apache.ignite.internal.wrapper.Wrappers;
+import org.apache.ignite.lang.ErrorGroups.Client;
+import org.apache.ignite.lang.IgniteException;
/**
* Client transaction rollback request.
@@ -41,6 +45,7 @@ public class ClientTransactionRollbackRequest {
* @param resources Resources.
* @param metrics Metrics.
* @param igniteTables Tables facade.
+ * @param reqToTxMap Tracker for first request of direct transactions.
* @param enableDirectMapping Enable direct mapping.
* @param sendRemoteWritesFlag Send remote writes flag.
* @return Future.
@@ -50,37 +55,55 @@ public class ClientTransactionRollbackRequest {
ClientResourceRegistry resources,
ClientHandlerMetricSource metrics,
IgniteTablesInternal igniteTables,
+ Map<Long, Long> reqToTxMap,
boolean enableDirectMapping,
boolean sendRemoteWritesFlag
)
throws IgniteInternalCheckedException {
long resourceId = in.unpackLong();
- InternalTransaction tx =
resources.remove(resourceId).get(InternalTransaction.class);
+ InternalTransaction tx;
- if (enableDirectMapping && !tx.isReadOnly()) {
- // Attempt to merge server and client transactions.
- int cnt = in.unpackInt(); // Number of direct enlistments.
- for (int i = 0; i < cnt; i++) {
- int tableId = in.unpackInt();
- int partId = in.unpackInt();
- String consistentId = in.unpackString();
- long token = in.unpackLong();
+ if (!enableDirectMapping) {
+ tx = resources.remove(resourceId).get(InternalTransaction.class);
+ } else if (resourceId < 0) {
+ // Direct mapping was enabled, but the user does not know the
resourceId, so he sent the first req id.
+ long reqId = -resourceId;
+ var actualResourceId = reqToTxMap.get(reqId);
- TableViewInternal table = igniteTables.cachedTable(tableId);
+ if (actualResourceId == null) {
+ throw new IgniteException(Client.RESOURCE_NOT_FOUND_ERR,
"Failed to find resource from requestId: " + reqId);
+ }
+
+ tx =
resources.remove(actualResourceId).get(InternalTransaction.class);
+ // Will not remove right away from reqToTxMap, it will be remove
automatically on rollback.
+ } else {
+ tx = resources.remove(resourceId).get(InternalTransaction.class);
+
+ if (!tx.isReadOnly()) {
+ // Attempt to merge server and client transactions.
+ int cnt = in.unpackInt(); // Number of direct enlistments.
+ for (int i = 0; i < cnt; i++) {
+ int tableId = in.unpackInt();
+ int partId = in.unpackInt();
+ String consistentId = in.unpackString();
+ long token = in.unpackLong();
- if (table != null) {
- merge(table.internalTable(), partId, consistentId, token,
tx, false);
+ TableViewInternal table =
igniteTables.cachedTable(tableId);
+
+ if (table != null) {
+ merge(table.internalTable(), partId, consistentId,
token, tx, false);
+ }
}
- }
- if (cnt > 0) {
- in.unpackLong(); // Unpack causality.
+ if (cnt > 0) {
+ in.unpackLong(); // Unpack causality.
- ReadWriteTransactionImpl tx0 = (ReadWriteTransactionImpl) tx;
+ ReadWriteTransactionImpl tx0 = Wrappers.unwrap(tx,
ReadWriteTransactionImpl.class);
- // Enforce cleanup.
- tx0.noRemoteWrites(sendRemoteWritesFlag && in.unpackBoolean());
+ // Enforce cleanup.
+ tx0.noRemoteWrites(sendRemoteWritesFlag &&
in.unpackBoolean());
+ }
}
}
diff --git
a/modules/client/src/integrationTest/java/org/apache/ignite/internal/client/ItThinClientTransactionsTest.java
b/modules/client/src/integrationTest/java/org/apache/ignite/internal/client/ItThinClientTransactionsTest.java
index 7d18e4b3478..7d87e7da0ad 100644
---
a/modules/client/src/integrationTest/java/org/apache/ignite/internal/client/ItThinClientTransactionsTest.java
+++
b/modules/client/src/integrationTest/java/org/apache/ignite/internal/client/ItThinClientTransactionsTest.java
@@ -23,10 +23,14 @@ import static java.util.Comparator.comparing;
import static
org.apache.ignite.internal.IgniteExceptionTestUtils.publicException;
import static
org.apache.ignite.internal.IgniteExceptionTestUtils.publicExceptionWithHint;
import static org.apache.ignite.internal.TestWrappers.unwrapIgniteImpl;
+import static
org.apache.ignite.internal.testframework.matchers.CompletableFutureExceptionMatcher.willThrowFast;
import static
org.apache.ignite.internal.testframework.matchers.CompletableFutureExceptionMatcher.willThrowWithCauseOrSuppressed;
import static
org.apache.ignite.internal.testframework.matchers.CompletableFutureMatcher.willSucceedFast;
+import static
org.apache.ignite.internal.testframework.matchers.CompletableFutureMatcher.willSucceedIn;
import static org.awaitility.Awaitility.await;
import static org.hamcrest.MatcherAssert.assertThat;
+import static org.hamcrest.Matchers.containsString;
+import static org.hamcrest.Matchers.is;
import static org.hamcrest.Matchers.isA;
import static org.hamcrest.Matchers.startsWith;
import static org.junit.jupiter.api.Assertions.assertDoesNotThrow;
@@ -60,6 +64,7 @@ import java.util.stream.Collectors;
import java.util.stream.Stream;
import org.apache.ignite.Ignite;
import org.apache.ignite.client.IgniteClient;
+import org.apache.ignite.client.handler.ClientInboundMessageHandler;
import org.apache.ignite.internal.app.IgniteImpl;
import org.apache.ignite.internal.client.sql.ClientSql;
import org.apache.ignite.internal.client.sql.PartitionMappingProvider;
@@ -90,7 +95,7 @@ import org.apache.ignite.tx.Transaction;
import org.apache.ignite.tx.TransactionException;
import org.apache.ignite.tx.TransactionOptions;
import org.junit.jupiter.api.AfterEach;
-import org.junit.jupiter.api.Disabled;
+import org.junit.jupiter.api.Nested;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.params.ParameterizedTest;
import org.junit.jupiter.params.provider.Arguments;
@@ -1424,34 +1429,160 @@ public class ItThinClientTransactionsTest extends
ItAbstractThinClientTest {
assertThat(kvView.removeAllAsync(null, Arrays.asList(key0, key,
key2)), willSucceedFast());
}
- @Test
- @Disabled("https://issues.apache.org/jira/browse/IGNITE-27947")
- public void testRollbackDoesNotBlockOnLockConflictDuringFirstRequest()
throws InterruptedException {
- // Note: reversed tx priority is required for this test.
- ClientTable table = (ClientTable) table();
- KeyValueView<Tuple, Tuple> kvView = table().keyValueView();
+ @Nested
+ class OnConflictDuringFirstRequest {
+ class Data {
+ final IgniteImpl ignite;
+ final List<Tuple> tuples;
+ final ClientLazyTransaction tx1;
+ final ClientLazyTransaction tx2;
+ final CompletableFuture<?> req2Fut;
+
+ Data(
+ IgniteImpl ignite,
+ List<Tuple> tuples,
+ ClientLazyTransaction tx1,
+ ClientLazyTransaction tx2,
+ CompletableFuture<?> req2Fut
+ ) {
+ this.ignite = ignite;
+ this.tuples = tuples;
+ this.tx1 = tx1;
+ this.tx2 = tx2;
+ this.req2Fut = req2Fut;
+ }
+ }
- Map<Partition, ClusterNode> map =
table.partitionDistribution().primaryReplicasAsync().join();
- List<Tuple> tuples0 = generateKeysForPartition(100, 10, map, 0, table);
+ Data prepareBlockedTransaction(KillTestContext ctx) throws
InterruptedException {
+ ClientTable table = (ClientTable) table();
+ ClientSql sql = (ClientSql) client().sql();
- // We need a waiter for this scenario.
- Tuple key = tuples0.get(0);
- Tuple val = val("1");
+ Map<Partition, ClusterNode> map =
table.partitionDistribution().primaryReplicasAsync().join();
+ Entry<Partition, ClusterNode> mapping =
map.entrySet().iterator().next();
+ List<Tuple> tuples0 = generateKeysForPartition(100, 10, map, (int)
mapping.getKey().id(), table);
+ Ignite server = server(mapping.getValue());
+ IgniteImpl ignite = unwrapIgniteImpl(server);
+
+ // Init SQL mappings.
+ Tuple key0 = tuples0.get(0);
+ sql.execute(format("INSERT INTO %s (%s, %s) VALUES (?, ?)",
TABLE_NAME, COLUMN_KEY, COLUMN_VAL),
+ key0.intValue(0), key0.intValue(0) + "");
+ await().atMost(2, TimeUnit.SECONDS)
+ .until(() ->
sql.partitionAwarenessCachedMetas().stream().allMatch(PartitionMappingProvider::ready));
+
+ // We need a waiter for this scenario.
+ Tuple key = tuples0.get(1);
+
+ ClientLazyTransaction tx2 = (ClientLazyTransaction)
client().transactions().begin();
+ ClientLazyTransaction tx1 = (ClientLazyTransaction)
client().transactions().begin();
+
+ // Starts the transaction.
+ assertThat(ctx.put.apply(client(), tx1, key), willSucceedIn(120,
TimeUnit.SECONDS));
+
+ await().atMost(3, TimeUnit.SECONDS).until(() -> {
+ Iterator<Lock> locks =
ignite.txManager().lockManager().locks(tx1.startedTx().txId());
+
+ int count = CollectionUtils.count(locks);
+ return count == 2;
+ });
+
+ // Will wait for lock.
+ CompletableFuture<?> fut2 = ctx.put.apply(client(), tx2, key);
+ Thread.sleep(500);
+
+ assertThat(fut2.isDone(), is(false));
+ IgniteTestUtils.assertThrows(AssertionError.class, () ->
ClientTransaction.get(tx2), "Transaction is starting");
+
+ return new Data(ignite, tuples0, tx1, tx2, fut2);
+ }
- ClientLazyTransaction tx1 = (ClientLazyTransaction)
client().transactions().begin();
- ClientLazyTransaction tx2 = (ClientLazyTransaction)
client().transactions().begin();
+ @ParameterizedTest
+
@MethodSource("org.apache.ignite.internal.client.ItThinClientTransactionsTest#killTestContextFactory")
+ public void testRollbackDoesNotBlock(KillTestContext ctx) throws
InterruptedException {
+ var test = prepareBlockedTransaction(ctx);
- kvView.put(tx1, key, val);
+ // Rollback should not be blocked.
+ assertThat(test.tx2.rollbackAsync(), willSucceedIn(1,
TimeUnit.SECONDS));
+ assertThat(test.req2Fut, willThrowFast(
+ ctx.expectedErr,
+ "Can't acquire a lock because transaction is already
finished"));
- // Will wait for lock.
- CompletableFuture<Void> fut2 = kvView.putAsync(tx2, key, val);
- assertFalse(fut2.isDone());
+ assertThat(test.tx1.rollbackAsync(), willSucceedIn(1,
TimeUnit.SECONDS));
- Thread.sleep(500);
+ var ex = assertThrows(TransactionException.class, () ->
ClientTransaction.get(test.tx2));
+ assertThat(ex.getMessage(), containsString("Transaction is already
finished"));
+ assertThat(ex.getMessage(), containsString("committed=false"));
- // Rollback should not be blocked.
- assertThat(tx2.rollbackAsync(), willSucceedFast());
- assertThat(tx1.rollbackAsync(), willSucceedFast());
+ KeyValueView<Tuple, Tuple> kvView = table().keyValueView();
+ assertThat(kvView.removeAllAsync(null, test.tuples.subList(0, 2)),
willSucceedIn(5, TimeUnit.SECONDS));
+ }
+
+ @ParameterizedTest
+
@MethodSource("org.apache.ignite.internal.client.ItThinClientTransactionsTest#killTestContextFactory")
+ public void testOperationsBlockWaitingForLock(KillTestContext ctx)
throws InterruptedException {
+ var test = prepareBlockedTransaction(ctx);
+
+ CompletableFuture<?> fut3 = ctx.put.apply(client(), test.tx2,
test.tuples.get(2));
+
+ assertDoesNotThrow(test.tx1::startedTx);
+
+ assertThat(test.tx1.rollbackAsync(), willSucceedIn(1,
TimeUnit.SECONDS));
+
+ // After the lock is open, the requests are free to complete.
+ assertThat(test.req2Fut, willSucceedIn(1, TimeUnit.SECONDS));
+ assertThat(fut3, willSucceedIn(1, TimeUnit.SECONDS));
+
+ assertThat(test.tx2.rollbackAsync(), willSucceedIn(1,
TimeUnit.SECONDS));
+
+ KeyValueView<Tuple, Tuple> kvView = table().keyValueView();
+ assertThat(kvView.removeAllAsync(null, test.tuples.subList(0, 3)),
willSucceedIn(5, TimeUnit.SECONDS));
+ }
+
+ @ParameterizedTest
+
@MethodSource("org.apache.ignite.internal.client.ItThinClientTransactionsTest#killTestContextFactory")
+ public void testCancelByRequestIdNotAvailable(KillTestContext ctx)
throws InterruptedException {
+ var test = prepareBlockedTransaction(ctx);
+
+ // Remove firstReqMapping from the server side.
+ {
+ Map<Long, Long> firstReqToTxResMap =
IgniteTestUtils.getFieldValue(
+ test.ignite.clientInboundMessageHandler(),
+ ClientInboundMessageHandler.class,
+ "firstReqToTxResMap"
+ );
+
+ CompletableFuture<Object> reqInfoFut =
IgniteTestUtils.getFieldValue(test.tx2, ClientLazyTransaction.class,
+ "requestInfoFuture");
+ Object requestInfo = reqInfoFut.join();
+ long firstReqId = IgniteTestUtils.getFieldValue(requestInfo,
"firstReqId");
+ firstReqToTxResMap.remove(firstReqId);
+ }
+
+ // Will block because of the error.
+ var rollbackTx2Fut1 = test.tx2.rollbackAsync();
+ Thread.sleep(1_000);
+ assertThat(rollbackTx2Fut1.isDone(), is(false));
+
+ // Do another concurrent rollback call just to make sure.
+ // If we allow multiple rollback requests by id to be sent
concurrently, the outcome might be different.
+ var rollbackTx2Fut2 = test.tx2.rollbackAsync();
+ Thread.sleep(1_000);
+ assertThat(rollbackTx2Fut2.isDone(), is(false));
+
+ // Now unblock the transaction.
+ assertThat(test.tx1.rollbackAsync(), willSucceedIn(1,
TimeUnit.SECONDS));
+
+ // Requests should rollback.
+ assertThat(rollbackTx2Fut1, willSucceedIn(1, TimeUnit.SECONDS));
+ assertThat(rollbackTx2Fut2, willSucceedIn(1, TimeUnit.SECONDS));
+
+ var ex = assertThrows(TransactionException.class, () ->
ClientTransaction.get(test.tx2));
+ assertThat(ex.getMessage(), containsString("Transaction is already
finished"));
+ assertThat(ex.getMessage(), containsString("committed=false"));
+
+ KeyValueView<Tuple, Tuple> kvView = table().keyValueView();
+ assertThat(kvView.removeAllAsync(null, test.tuples.subList(0, 2)),
willSucceedIn(5, TimeUnit.SECONDS));
+ }
}
@ParameterizedTest
diff --git
a/modules/client/src/main/java/org/apache/ignite/internal/client/PayloadOutputChannel.java
b/modules/client/src/main/java/org/apache/ignite/internal/client/PayloadOutputChannel.java
index d932247b62f..6c9c7c5711a 100644
---
a/modules/client/src/main/java/org/apache/ignite/internal/client/PayloadOutputChannel.java
+++
b/modules/client/src/main/java/org/apache/ignite/internal/client/PayloadOutputChannel.java
@@ -17,8 +17,9 @@
package org.apache.ignite.internal.client;
+import java.util.ArrayList;
+import java.util.List;
import org.apache.ignite.internal.client.proto.ClientMessagePacker;
-import org.jetbrains.annotations.Nullable;
/**
* Thin client payload output channel.
@@ -33,8 +34,8 @@ public class PayloadOutputChannel implements AutoCloseable {
/** Client request ID. */
private final long requestId;
- /** Action to be executed when the payload is sent. */
- private volatile @Nullable Runnable onSentAction;
+ /** Actions to be executed when the payload is sent. */
+ private final List<Runnable> onSentActions;
/**
* Constructor.
@@ -47,6 +48,7 @@ public class PayloadOutputChannel implements AutoCloseable {
this.ch = ch;
this.out = out;
this.requestId = requestId;
+ this.onSentActions = new ArrayList<>();
}
/**
@@ -88,11 +90,11 @@ public class PayloadOutputChannel implements AutoCloseable {
* @param action Action to be executed.
*/
public void onSent(Runnable action) {
- this.onSentAction = action;
+ this.onSentActions.add(action);
}
/** Returns an action, if any, that should be executed when the payload is
sent successfully. */
- @Nullable Runnable onSentAction() {
- return onSentAction;
+ List<Runnable> onSentActions() {
+ return onSentActions;
}
}
diff --git
a/modules/client/src/main/java/org/apache/ignite/internal/client/TcpClientChannel.java
b/modules/client/src/main/java/org/apache/ignite/internal/client/TcpClientChannel.java
index 0a9322426a1..ee07a6d6cd8 100644
---
a/modules/client/src/main/java/org/apache/ignite/internal/client/TcpClientChannel.java
+++
b/modules/client/src/main/java/org/apache/ignite/internal/client/TcpClientChannel.java
@@ -33,6 +33,7 @@ import io.netty.channel.ChannelFuture;
import java.net.InetSocketAddress;
import java.util.ArrayList;
import java.util.BitSet;
+import java.util.Collections;
import java.util.EnumSet;
import java.util.List;
import java.util.Map;
@@ -106,7 +107,8 @@ class TcpClientChannel implements ClientChannel,
ClientMessageHandler, ClientCon
ProtocolBitmaskFeature.COMPUTE_OBSERVABLE_TS,
ProtocolBitmaskFeature.TX_DIRECT_MAPPING_SEND_REMOTE_WRITES,
ProtocolBitmaskFeature.TX_DIRECT_MAPPING_SEND_DISCARD,
- ProtocolBitmaskFeature.SQL_UPDATE_COUNTERS_2
+ ProtocolBitmaskFeature.SQL_UPDATE_COUNTERS_2,
+ ProtocolBitmaskFeature.TX_ROLLBACK_USING_FIRST_REQUEST
));
/** Minimum supported heartbeat interval. */
@@ -405,6 +407,7 @@ class TcpClientChannel implements ClientChannel,
ClientMessageHandler, ClientCon
payloadWriter.accept(payloadCh);
}
+ var actions =
Collections.unmodifiableList(payloadCh.onSentActions());
write(req).addListener(f -> {
if (!f.isSuccess()) {
String msg = "Failed to send request async [id=" + id + ",
op=" + opCode + ", remoteAddress=" + cfg.getAddress() + "]";
@@ -420,8 +423,7 @@ class TcpClientChannel implements ClientChannel,
ClientMessageHandler, ClientCon
} else {
metrics.requestsSentIncrement();
- Runnable action = payloadCh.onSentAction();
- if (action != null) {
+ for (Runnable action : actions) {
asyncContinuationExecutor.execute(action);
}
}
diff --git
a/modules/client/src/main/java/org/apache/ignite/internal/client/sql/ClientSql.java
b/modules/client/src/main/java/org/apache/ignite/internal/client/sql/ClientSql.java
index 694191eb744..5ea341b372e 100644
---
a/modules/client/src/main/java/org/apache/ignite/internal/client/sql/ClientSql.java
+++
b/modules/client/src/main/java/org/apache/ignite/internal/client/sql/ClientSql.java
@@ -337,13 +337,21 @@ public class ClientSql implements IgniteSql {
return txStartFut.thenCompose(tx -> ch.serviceAsync(
ClientOp.SQL_EXEC,
- payloadWriter(ctx, transaction, cancellationToken,
queryModifiers, statement, arguments, shouldTrackOperation),
+ DirectTxUtils.payloadWriter(
+ ctx,
+ transaction,
+ payloadWriter(ctx, transaction, cancellationToken,
queryModifiers, statement, arguments, shouldTrackOperation)
+ ),
payloadReader(ctx, mapper, tx, statement),
() -> DirectTxUtils.resolveChannel(ctx, ch,
shouldTrackOperation, tx, mapping),
null,
false
).handle((BiFunction<AsyncResultSet<T>, Throwable,
CompletableFuture<AsyncResultSet<T>>>) (r, err) -> {
if (err != null) {
+ if (DirectTxUtils.tryHandleErrorOnFirstRequest(ctx, ch)) {
+ return failedFuture(err);
+ }
+
if (tx != null && shouldRecordTransactionFailure(err)) {
tx.recordOperationFailure(err);
}
@@ -352,26 +360,7 @@ public class ClientSql implements IgniteSql {
return failedFuture(err);
}
- if (ctx.enlistmentToken != null) {
- // In case of direct mapping error need to rollback the tx
on coordinator.
- return tx.rollbackAsync().handle((ignored, err0) -> {
- if (err0 != null) {
- err.addSuppressed(err0);
- }
-
- sneakyThrow(err);
- return null;
- });
- } else {
- return
tx.rollbackAndDiscardDirectMappings(false).handle((ignored, err0) -> {
- if (err0 != null) {
- err.addSuppressed(err0);
- }
-
- sneakyThrow(err);
- return null;
- });
- }
+ return DirectTxUtils.handleErrorOnOtherRequests(ctx, tx, err);
}
return completedFuture(r);
diff --git
a/modules/client/src/main/java/org/apache/ignite/internal/client/table/ClientTable.java
b/modules/client/src/main/java/org/apache/ignite/internal/client/table/ClientTable.java
index b4b944c13d5..1c1e50678e4 100644
---
a/modules/client/src/main/java/org/apache/ignite/internal/client/table/ClientTable.java
+++
b/modules/client/src/main/java/org/apache/ignite/internal/client/table/ClientTable.java
@@ -497,7 +497,7 @@ public class ClientTable implements Table {
return txStartFut.thenCompose(tx0 -> {
return ch.serviceAsync(
opCode,
- w -> writer.accept(schema, w, ctx),
+ DirectTxUtils.payloadWriter(ctx, tx, w ->
writer.accept(schema, w, ctx)),
r -> readSchemaAndReadData(schema, r, reader,
defaultValue, responseSchemaRequired, ctx, tx0),
() -> DirectTxUtils.resolveChannel(ctx, ch,
ClientOp.isWrite(opCode), tx0, pm),
retryPolicyOverride,
@@ -508,13 +508,7 @@ public class ClientTable implements Table {
if (ex != null) {
Throwable cause = ex;
- if (ctx.firstReqFut != null) {
- // Create failed transaction.
- ClientTransaction failed = new
ClientTransaction(ctx.channel, ch, id, ctx.readOnly, null,
- ctx.pm, null,
ch.observableTimestamp(), 0);
- failed.fail();
- ctx.firstReqFut.complete(failed);
- // Txn was not started, rollback
is not required.
+ if
(DirectTxUtils.tryHandleErrorOnFirstRequest(ctx, ch)) {
fut.completeExceptionally(unwrapCause(ex));
return null;
}
@@ -547,37 +541,20 @@ public class ClientTable implements Table {
cause = cause.getCause();
}
+ }
- if (tx0 == null) {
- fut.completeExceptionally(ex);
- } else {
-
tx0.rollbackAndDiscardDirectMappings(false).handle((ignored, err0) -> {
- if (err0 != null) {
- ex.addSuppressed(err0);
- }
-
-
fut.completeExceptionally(ex);
-
- return (T) null;
- });
- }
+ if (tx0 == null) {
+ fut.completeExceptionally(ex);
} else {
- // In case of direct mapping error
we need to rollback the tx on coordinator.
-
tx0.rollbackAsync().handle((ignored, err0) -> {
- if (err0 != null) {
- ex.addSuppressed(err0);
- }
-
- fut.completeExceptionally(ex);
-
- return (T) null;
- });
+
DirectTxUtils.handleErrorOnOtherRequests(ctx, tx0, ex)
+ .whenComplete((ignored,
err0) -> fut.completeExceptionally(err0));
}
+
+ return null;
} else {
fut.complete(ret);
+ return null;
}
-
- return null;
});
});
}).exceptionally(ex -> {
diff --git
a/modules/client/src/main/java/org/apache/ignite/internal/client/tx/ClientLazyTransaction.java
b/modules/client/src/main/java/org/apache/ignite/internal/client/tx/ClientLazyTransaction.java
index 70bf1765f9f..d22759aa1b5 100644
---
a/modules/client/src/main/java/org/apache/ignite/internal/client/tx/ClientLazyTransaction.java
+++
b/modules/client/src/main/java/org/apache/ignite/internal/client/tx/ClientLazyTransaction.java
@@ -17,14 +17,18 @@
package org.apache.ignite.internal.client.tx;
+import static java.util.concurrent.CompletableFuture.failedFuture;
+import static
org.apache.ignite.internal.client.proto.ProtocolBitmaskFeature.TX_ROLLBACK_USING_FIRST_REQUEST;
import static
org.apache.ignite.internal.client.tx.ClientTransactions.USE_CONFIGURED_TIMEOUT_DEFAULT;
import static
org.apache.ignite.internal.util.CompletableFutures.nullCompletedFuture;
import java.util.EnumSet;
import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.atomic.AtomicBoolean;
import java.util.function.Supplier;
import org.apache.ignite.internal.client.ClientChannel;
import org.apache.ignite.internal.client.ReliableChannel;
+import org.apache.ignite.internal.client.proto.ClientOp;
import org.apache.ignite.internal.client.proto.tx.ClientInternalTxOptions;
import org.apache.ignite.internal.hlc.HybridTimestampTracker;
import org.apache.ignite.internal.lang.IgniteBiTuple;
@@ -39,12 +43,24 @@ import org.jetbrains.annotations.Nullable;
* Lazy client transaction. Will be actually started on the first operation.
*/
public class ClientLazyTransaction implements Transaction {
+ private static final CompletableFuture<Void>
TX_INFO_ALREADY_RECEIVED_FUTURE =
+ failedFuture(new
UnsupportedOperationException("TX_ROLLBACK_USING_FIRST_REQUEST was skipped
because tx info already present."));
+
+ private static final CompletableFuture<Void> NOT_SUPPORTED_FUTURE =
+ failedFuture(new
UnsupportedOperationException("TX_ROLLBACK_USING_FIRST_REQUEST is not
supported"));
+
private final long observableTimestamp;
private final @Nullable TransactionOptions options;
private final EnumSet<ClientInternalTxOptions> txOptions;
+ private final AtomicBoolean cancelled = new AtomicBoolean(false);
+
+ private final CompletableFuture<RequestInfo> requestInfoFuture = new
CompletableFuture<>();
+
+ private final CompletableFuture<Void> rollbackFuture = new
CompletableFuture<>();
+
private volatile CompletableFuture<ClientTransaction> tx;
ClientLazyTransaction(HybridTimestampTracker observableTimestamp,
@Nullable TransactionOptions options) {
@@ -94,12 +110,47 @@ public class ClientLazyTransaction implements Transaction {
public CompletableFuture<Void> rollbackAsync() {
var tx0 = tx;
+ // TODO: IGNITE-28405 This is really fishy. It will probably let you
reuse a transaction after calling a rollback :(
if (tx0 == null) {
// No operations were performed, nothing to rollback.
return nullCompletedFuture();
}
- return tx0.thenCompose(ClientTransaction::rollbackAsync);
+ if (cancelled.compareAndSet(false, true)) {
+ return CompletableFuture.anyOf(requestInfoFuture, tx0)
+ .thenCompose(input -> {
+ if (tx0.isDone()) {
+ return TX_INFO_ALREADY_RECEIVED_FUTURE;
+ }
+
+ // The input must be from requestInfoFuture
+ RequestInfo reqInfo = (RequestInfo) input;
+ ClientChannel ch = reqInfo.ch;
+ if
(ch.protocolContext().isFeatureSupported(TX_ROLLBACK_USING_FIRST_REQUEST)) {
+ return ch.serviceAsync(ClientOp.TX_ROLLBACK, w ->
w.out().packLong(-reqInfo.firstReqId), r -> null);
+ } else {
+ return NOT_SUPPORTED_FUTURE;
+ }
+ })
+ .handle((res, e) -> {
+ // If ok, we don't need to wait for the response. If
error, let's block.
+ if (e == null) {
+ return nullCompletedFuture();
+ } else {
+ return
tx0.thenCompose(ClientTransaction::rollbackAsync);
+ }
+ })
+ .thenCompose(f -> (CompletableFuture<Void>) f)
+ .whenComplete((res, e) -> {
+ if (e != null) {
+ rollbackFuture.completeExceptionally(e);
+ } else {
+ rollbackFuture.complete(null);
+ }
+ });
+ } else {
+ return rollbackFuture;
+ }
}
@Override
@@ -221,8 +272,23 @@ public class ClientLazyTransaction implements Transaction {
return txOptions;
}
+ public void updateRequestInfo(long firstReqId, ClientChannel ch) {
+ boolean s = this.requestInfoFuture.complete(new
RequestInfo(firstReqId, ch));
+ assert s : "Transaction request info was previously set";
+ }
+
@Override
public String toString() {
return S.toString(this);
}
+
+ private static class RequestInfo {
+ private final long firstReqId;
+ private final ClientChannel ch;
+
+ private RequestInfo(long firstReqId, ClientChannel ch) {
+ this.firstReqId = firstReqId;
+ this.ch = ch;
+ }
+ }
}
diff --git
a/modules/client/src/main/java/org/apache/ignite/internal/client/tx/ClientTransaction.java
b/modules/client/src/main/java/org/apache/ignite/internal/client/tx/ClientTransaction.java
index eaab408ace6..b368b049fe2 100644
---
a/modules/client/src/main/java/org/apache/ignite/internal/client/tx/ClientTransaction.java
+++
b/modules/client/src/main/java/org/apache/ignite/internal/client/tx/ClientTransaction.java
@@ -86,7 +86,7 @@ public class ClientTransaction implements Transaction {
@IgniteToStringExclude
private final ClientChannel ch;
- /** Transaction id. */
+ /** Node-local resource id for the Transaction. */
private final long id;
/** The future used on repeated commit/rollback. */
diff --git
a/modules/client/src/main/java/org/apache/ignite/internal/client/tx/DirectTxUtils.java
b/modules/client/src/main/java/org/apache/ignite/internal/client/tx/DirectTxUtils.java
index 811a5689593..d0ce111c6ba 100644
---
a/modules/client/src/main/java/org/apache/ignite/internal/client/tx/DirectTxUtils.java
+++
b/modules/client/src/main/java/org/apache/ignite/internal/client/tx/DirectTxUtils.java
@@ -23,6 +23,7 @@ import static
org.apache.ignite.internal.client.proto.tx.ClientInternalTxOptions
import static
org.apache.ignite.internal.client.proto.tx.ClientTxUtils.TX_ID_DIRECT;
import static
org.apache.ignite.internal.client.proto.tx.ClientTxUtils.TX_ID_FIRST_DIRECT;
import static
org.apache.ignite.internal.util.CompletableFutures.nullCompletedFuture;
+import static org.apache.ignite.internal.util.ExceptionUtils.sneakyThrow;
import static org.apache.ignite.lang.ErrorGroups.Client.CONNECTION_ERR;
import static org.apache.ignite.lang.ErrorGroups.Common.INTERNAL_ERR;
@@ -34,6 +35,7 @@ import org.apache.ignite.internal.client.ClientChannel;
import org.apache.ignite.internal.client.PartitionMapping;
import org.apache.ignite.internal.client.PayloadInputChannel;
import org.apache.ignite.internal.client.PayloadOutputChannel;
+import org.apache.ignite.internal.client.PayloadWriter;
import org.apache.ignite.internal.client.ReliableChannel;
import org.apache.ignite.internal.client.WriteContext;
import org.apache.ignite.internal.client.proto.ClientMessageUnpacker;
@@ -282,6 +284,90 @@ public class DirectTxUtils {
});
}
+ /**
+ * If the current request is the first request of a direct tx, add a
listener to the {@link PayloadWriter}.
+ *
+ * @param ctx The {@link WriteContext} that holds transactional context
information.
+ * @param tx The client transaction associated with the request, or {@code
null} if none.
+ * @param base Request native {@link PayloadWriter}.
+ * @return The {@link PayloadWriter} that should be used on the request.
+ */
+ public static PayloadWriter payloadWriter(WriteContext ctx, @Nullable
Transaction tx, PayloadWriter base) {
+ if (ctx.firstReqFut != null && tx instanceof ClientLazyTransaction) {
+ return poc -> {
+ base.accept(poc);
+
+ var clientLazyTx = (ClientLazyTransaction) tx;
+ long requestId = poc.requestId();
+ ClientChannel cc = poc.clientChannel();
+ poc.onSent(() -> clientLazyTx.updateRequestInfo(requestId,
cc));
+ };
+ } else {
+ return base;
+ }
+ }
+
+ /**
+ * Try to handle error on first request. Returns false if not in the
context of the first request.
+ * This method essentially populates context with a failed request
instance.
+ *
+ * @param ctx The {@link WriteContext} that holds transactional context
information.
+ * @param ch The {@link ReliableChannel} used to resolve the actual
communication channel.
+ * @return Whether the error was handled or not.
+ */
+ public static boolean tryHandleErrorOnFirstRequest(WriteContext ctx,
ReliableChannel ch) {
+ if (ctx.firstReqFut == null) {
+ return false;
+ }
+
+ // Create failed transaction.
+ ClientTransaction failed = new ClientTransaction(
+ ctx.channel,
+ ch,
+ -1,
+ ctx.readOnly,
+ null,
+ ctx.pm,
+ null,
+ ch.observableTimestamp(),
+ 0
+ );
+
+ failed.fail();
+ ctx.firstReqFut.complete(failed);
+ // Txn was not started, rollback is not required.
+ return true;
+ }
+
+ /**
+ * Handles errors after the first request.
+ * Essentially call the rollback on the transaction and appends any errors
to the original error.
+ *
+ * @param ctx The {@link WriteContext} that holds transactional context
information.
+ * @param tx The client transaction.
+ * @param err The error to be handled.
+ * @param <T> type of the expected future.
+ * @return A completable future what always fails with the original error
plus any suppressed errors.
+ */
+ public static <T> CompletableFuture<T>
handleErrorOnOtherRequests(WriteContext ctx, ClientTransaction tx, Throwable
err) {
+ CompletableFuture<Void> rollback;
+ if (ctx.enlistmentToken != null) {
+ // In case of direct mapping error need to rollback the tx on
coordinator.
+ rollback = tx.rollbackAsync();
+ } else {
+ rollback = tx.rollbackAndDiscardDirectMappings(false);
+ }
+
+ return rollback.handle((ignored, err0) -> {
+ if (err0 != null) {
+ err.addSuppressed(err0);
+ }
+
+ sneakyThrow(err);
+ return null;
+ });
+ }
+
private static CompletableFuture<ClientChannel> resolveChannelInner(
WriteContext ctx,
ReliableChannel ch,