This is an automated email from the ASF dual-hosted git repository.
ascherbakov pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/ignite-3.git
The following commit(s) were added to refs/heads/main by this push:
new 8b5964b45b0 IGNITE-27838 Fix directly mapped transactions rollback on
enlistment failure (#7588)
8b5964b45b0 is described below
commit 8b5964b45b09dc82f7af1c056099860af19c1a20
Author: Aleksei Scherbakov <[email protected]>
AuthorDate: Thu Feb 26 17:25:58 2026 +0300
IGNITE-27838 Fix directly mapped transactions rollback on enlistment
failure (#7588)
---
.../java/org/apache/ignite/lang/ErrorGroups.java | 3 +
.../ignite/internal/client/proto/ClientOp.java | 4 +
.../internal/client/proto/ErrorExtensions.java | 2 +
.../client/proto/ProtocolBitmaskFeature.java | 7 +-
.../ignite/client/handler/ItClientHandlerTest.java | 1 +
.../ignite/client/handler/ClientHandlerModule.java | 3 +-
.../handler/ClientInboundMessageHandler.java | 14 +-
.../handler/requests/table/ClientTableCommon.java | 8 +
.../requests/tx/ClientTransactionBeginRequest.java | 17 +-
.../tx/ClientTransactionDiscardRequest.java | 77 +++++
.../internal/client/ClientDelayedAckException.java | 4 +-
.../client/ClientTransactionInflights.java | 26 +-
.../apache/ignite/internal/client/ClientUtils.java | 1 +
.../ignite/internal/client/ReliableChannel.java | 19 +-
.../ignite/internal/client/TcpClientChannel.java | 30 +-
.../ignite/internal/client/sql/ClientSql.java | 56 +++-
.../ignite/internal/client/table/ClientTable.java | 45 +--
.../internal/client/tx/ClientLazyTransaction.java | 25 +-
.../internal/client/tx/ClientTransaction.java | 211 ++++++++++---
.../ClientTransactionKilledException.java} | 35 ++-
.../internal/client/tx/ClientTransactions.java | 5 +-
.../ignite/internal/client/tx/DirectTxUtils.java | 6 +-
.../apache/ignite/client/ClientMetricsTest.java | 4 +-
.../client/ClientTransactionInflightTest.java | 23 +-
.../org/apache/ignite/client/RetryPolicyTest.java | 2 +-
.../apache/ignite/client/fakes/FakeTxManager.java | 8 +-
.../RepeatedFinishClientTransactionTest.java | 32 +-
.../ignite/internal/util/CollectionUtils.java | 22 ++
modules/platforms/cpp/ignite/common/error_codes.h | 1 +
modules/platforms/cpp/ignite/odbc/common_types.cpp | 1 +
.../platforms/dotnet/Apache.Ignite/ErrorCodes.g.cs | 3 +
.../app/client/ItAbstractThinClientTest.java | 18 +-
.../app/client/ItThinClientTransactionsTest.java | 342 +++++++++++++++++++--
.../distributed/storage/InternalTableImplTest.java | 8 +-
...sactionTest.java => ItKillTransactionTest.java} | 2 +-
.../ignite/internal/tx/InternalTransaction.java | 4 +
.../ignite/internal/tx/InternalTxOptions.java | 21 +-
.../tx/TransactionExceptionMapperProvider.java | 2 +
.../internal/tx/TransactionKilledException.java} | 36 +--
.../org/apache/ignite/internal/tx/TxManager.java | 14 +-
.../tx/impl/IgniteAbstractTransactionImpl.java | 4 +-
.../internal/tx/impl/ReadWriteTransactionImpl.java | 54 +++-
.../internal/tx/impl/TransactionInflights.java | 23 --
.../internal/tx/impl/TxCleanupRequestHandler.java | 30 ++
.../internal/tx/impl/TxCleanupRequestSender.java | 2 +-
.../ignite/internal/tx/impl/TxManagerImpl.java | 14 +-
.../tx/AbstractDeadlockPreventionTest.java | 18 ++
.../tx/impl/ReadOnlyTransactionImplTest.java | 3 +-
.../tx/impl/ReadWriteTransactionImplTest.java | 6 +-
49 files changed, 1050 insertions(+), 246 deletions(-)
diff --git a/modules/api/src/main/java/org/apache/ignite/lang/ErrorGroups.java
b/modules/api/src/main/java/org/apache/ignite/lang/ErrorGroups.java
index eab048a4a09..5835775d2bc 100755
--- a/modules/api/src/main/java/org/apache/ignite/lang/ErrorGroups.java
+++ b/modules/api/src/main/java/org/apache/ignite/lang/ErrorGroups.java
@@ -468,6 +468,9 @@ public class ErrorGroups {
/** Operation failed due to replication delayed ack failure. */
public static final int TX_DELAYED_ACK_ERR =
TX_ERR_GROUP.registerErrorCode((short) 17);
+
+ /** Transaction was internally killed. This is retriable state. */
+ public static final int TX_KILLED_ERR =
TX_ERR_GROUP.registerErrorCode((short) 18);
}
/** Replicator error group. */
diff --git
a/modules/client-common/src/main/java/org/apache/ignite/internal/client/proto/ClientOp.java
b/modules/client-common/src/main/java/org/apache/ignite/internal/client/proto/ClientOp.java
index d302934264c..cd9729c7f40 100644
---
a/modules/client-common/src/main/java/org/apache/ignite/internal/client/proto/ClientOp.java
+++
b/modules/client-common/src/main/java/org/apache/ignite/internal/client/proto/ClientOp.java
@@ -203,6 +203,9 @@ public class ClientOp {
/** Get next result set. */
public static final int SQL_CURSOR_NEXT_RESULT_SET = 74;
+ /** Discard request for directly mapped transactions. */
+ public static final int TX_DISCARD = 75;
+
/** Reserved for extensions: min. */
@SuppressWarnings("unused")
public static final int RESERVED_EXTENSION_RANGE_START = 1000;
@@ -257,6 +260,7 @@ public class ClientOp {
OP_MASK.set(STREAMER_BATCH_SEND);
OP_MASK.set(TX_COMMIT);
OP_MASK.set(TX_ROLLBACK);
+ OP_MASK.set(TX_DISCARD);
OP_MASK.set(TUPLE_GET_ALL);
OP_MASK.set(TUPLE_CONTAINS_ALL_KEYS);
}
diff --git
a/modules/client-common/src/main/java/org/apache/ignite/internal/client/proto/ErrorExtensions.java
b/modules/client-common/src/main/java/org/apache/ignite/internal/client/proto/ErrorExtensions.java
index c7ce644f608..3ddd836ecea 100644
---
a/modules/client-common/src/main/java/org/apache/ignite/internal/client/proto/ErrorExtensions.java
+++
b/modules/client-common/src/main/java/org/apache/ignite/internal/client/proto/ErrorExtensions.java
@@ -26,4 +26,6 @@ public class ErrorExtensions {
public static final String SQL_UPDATE_COUNTERS = "sql-update-counters";
public static final String DELAYED_ACK = "delayed-ack";
+
+ public static final String TX_KILL = "tx-kill";
}
diff --git
a/modules/client-common/src/main/java/org/apache/ignite/internal/client/proto/ProtocolBitmaskFeature.java
b/modules/client-common/src/main/java/org/apache/ignite/internal/client/proto/ProtocolBitmaskFeature.java
index 363070971d0..a908678aebc 100644
---
a/modules/client-common/src/main/java/org/apache/ignite/internal/client/proto/ProtocolBitmaskFeature.java
+++
b/modules/client-common/src/main/java/org/apache/ignite/internal/client/proto/ProtocolBitmaskFeature.java
@@ -107,7 +107,12 @@ public enum ProtocolBitmaskFeature {
/**
* Client supports Partition Awareness for SQL queries with table name in
the response metadata.
*/
- SQL_PARTITION_AWARENESS_TABLE_NAME(16);
+ SQL_PARTITION_AWARENESS_TABLE_NAME(16),
+
+ /**
+ * Send discard requests to directly mapped partitions.
+ */
+ TX_DIRECT_MAPPING_SEND_DISCARD(17);
private static final EnumSet<ProtocolBitmaskFeature>
ALL_FEATURES_AS_ENUM_SET =
EnumSet.allOf(ProtocolBitmaskFeature.class);
diff --git
a/modules/client-handler/src/integrationTest/java/org/apache/ignite/client/handler/ItClientHandlerTest.java
b/modules/client-handler/src/integrationTest/java/org/apache/ignite/client/handler/ItClientHandlerTest.java
index 746b1262f9b..943ceb20877 100644
---
a/modules/client-handler/src/integrationTest/java/org/apache/ignite/client/handler/ItClientHandlerTest.java
+++
b/modules/client-handler/src/integrationTest/java/org/apache/ignite/client/handler/ItClientHandlerTest.java
@@ -562,6 +562,7 @@ public class ItClientHandlerTest extends
BaseIgniteAbstractTest {
expected.set(14);
expected.set(15);
expected.set(16);
+ expected.set(17);
assertEquals(expected, supportedFeatures);
diff --git
a/modules/client-handler/src/main/java/org/apache/ignite/client/handler/ClientHandlerModule.java
b/modules/client-handler/src/main/java/org/apache/ignite/client/handler/ClientHandlerModule.java
index 8bb881142b7..1d37822cc79 100644
---
a/modules/client-handler/src/main/java/org/apache/ignite/client/handler/ClientHandlerModule.java
+++
b/modules/client-handler/src/main/java/org/apache/ignite/client/handler/ClientHandlerModule.java
@@ -100,7 +100,8 @@ public class ClientHandlerModule implements
IgniteComponent, PlatformComputeTran
ProtocolBitmaskFeature.SQL_MULTISTATEMENT_SUPPORT,
ProtocolBitmaskFeature.COMPUTE_OBSERVABLE_TS,
ProtocolBitmaskFeature.TX_DIRECT_MAPPING_SEND_REMOTE_WRITES,
- ProtocolBitmaskFeature.SQL_PARTITION_AWARENESS_TABLE_NAME
+ ProtocolBitmaskFeature.SQL_PARTITION_AWARENESS_TABLE_NAME,
+ ProtocolBitmaskFeature.TX_DIRECT_MAPPING_SEND_DISCARD
));
/** Connection id generator.
diff --git
a/modules/client-handler/src/main/java/org/apache/ignite/client/handler/ClientInboundMessageHandler.java
b/modules/client-handler/src/main/java/org/apache/ignite/client/handler/ClientInboundMessageHandler.java
index 94d8da215ac..036719498db 100644
---
a/modules/client-handler/src/main/java/org/apache/ignite/client/handler/ClientInboundMessageHandler.java
+++
b/modules/client-handler/src/main/java/org/apache/ignite/client/handler/ClientInboundMessageHandler.java
@@ -120,6 +120,7 @@ import
org.apache.ignite.client.handler.requests.table.ClientTupleUpsertRequest;
import
org.apache.ignite.client.handler.requests.table.partition.ClientTablePartitionPrimaryReplicasNodesGetRequest;
import
org.apache.ignite.client.handler.requests.tx.ClientTransactionBeginRequest;
import
org.apache.ignite.client.handler.requests.tx.ClientTransactionCommitRequest;
+import
org.apache.ignite.client.handler.requests.tx.ClientTransactionDiscardRequest;
import
org.apache.ignite.client.handler.requests.tx.ClientTransactionRollbackRequest;
import org.apache.ignite.compute.JobExecutionContext;
import org.apache.ignite.deployment.DeploymentUnitInfo;
@@ -171,6 +172,7 @@ import
org.apache.ignite.internal.table.IgniteTablesInternal;
import org.apache.ignite.internal.table.distributed.schema.SchemaVersions;
import org.apache.ignite.internal.table.distributed.schema.SchemaVersionsImpl;
import org.apache.ignite.internal.tx.DelayedAckException;
+import org.apache.ignite.internal.tx.TransactionKilledException;
import org.apache.ignite.internal.tx.TxManager;
import org.apache.ignite.internal.util.ExceptionUtils;
import org.apache.ignite.lang.CancelHandle;
@@ -701,10 +703,13 @@ public class ClientInboundMessageHandler
SchemaVersionMismatchException schemaVersionMismatchException =
findException(err, SchemaVersionMismatchException.class);
SqlBatchException sqlBatchException = findException(err,
SqlBatchException.class);
DelayedAckException delayedAckException = findException(err,
DelayedAckException.class);
+ TransactionKilledException transactionKilledException =
findException(err, TransactionKilledException.class);
err = firstNotNull(
schemaVersionMismatchException,
sqlBatchException,
+ delayedAckException,
+ transactionKilledException,
ExceptionUtils.unwrapCause(err)
);
@@ -746,6 +751,10 @@ public class ClientInboundMessageHandler
packer.packInt(1); // 1 extension.
packer.packString(ErrorExtensions.DELAYED_ACK);
packer.packUuid(delayedAckException.txId());
+ } else if (transactionKilledException != null) {
+ packer.packInt(1); // 1 extension.
+ packer.packString(ErrorExtensions.TX_KILL);
+ packer.packUuid(transactionKilledException.txId());
} else {
packer.packNil(); // No extensions.
}
@@ -947,7 +956,7 @@ public class ClientInboundMessageHandler
return ClientJdbcPrimaryKeyMetadataRequest.process(in,
jdbcQueryEventHandler);
case ClientOp.TX_BEGIN:
- return ClientTransactionBeginRequest.process(in, txManager,
resources, metrics, tsTracker);
+ return ClientTransactionBeginRequest.process(in, txManager,
resources, metrics, tsTracker, notificationSender(requestId));
case ClientOp.TX_COMMIT:
return ClientTransactionCommitRequest.process(in, resources,
metrics, clockService, igniteTables,
@@ -1076,6 +1085,9 @@ public class ClientInboundMessageHandler
case ClientOp.TABLE_GET_QUALIFIED:
return ClientTableGetQualifiedRequest.process(in,
igniteTables);
+ case ClientOp.TX_DISCARD:
+ return ClientTransactionDiscardRequest.process(in, txManager,
igniteTables);
+
default:
throw new IgniteException(PROTOCOL_ERR, "Unexpected operation
code: " + opCode);
}
diff --git
a/modules/client-handler/src/main/java/org/apache/ignite/client/handler/requests/table/ClientTableCommon.java
b/modules/client-handler/src/main/java/org/apache/ignite/client/handler/requests/table/ClientTableCommon.java
index 159afde916d..325f047d7a5 100644
---
a/modules/client-handler/src/main/java/org/apache/ignite/client/handler/requests/table/ClientTableCommon.java
+++
b/modules/client-handler/src/main/java/org/apache/ignite/client/handler/requests/table/ClientTableCommon.java
@@ -57,6 +57,7 @@ import org.apache.ignite.internal.table.TableViewInternal;
import org.apache.ignite.internal.tx.InternalTransaction;
import org.apache.ignite.internal.tx.InternalTxOptions;
import org.apache.ignite.internal.tx.PendingTxPartitionEnlistment;
+import org.apache.ignite.internal.tx.TransactionKilledException;
import org.apache.ignite.internal.tx.TxManager;
import org.apache.ignite.internal.tx.TxPriority;
import org.apache.ignite.internal.tx.TxState;
@@ -488,6 +489,13 @@ public class ClientTableCommon {
builder = builder.timeoutMillis(timeoutMillis);
}
+ builder.killClosure(notificationSender == null ? tx -> {} : tx
-> {
+ // Exception will be ignored if a client doesn't support
it.
+ TransactionKilledException err = new
TransactionKilledException(tx.id(), txManager);
+
+ notificationSender.sendNotification(w -> {}, err,
NULL_HYBRID_TIMESTAMP);
+ });
+
InternalTxOptions txOptions = builder.build();
var tx = startExplicitTx(tsUpdater, txManager,
HybridTimestamp.nullableHybridTimestamp(observableTs), readOnly, txOptions);
diff --git
a/modules/client-handler/src/main/java/org/apache/ignite/client/handler/requests/tx/ClientTransactionBeginRequest.java
b/modules/client-handler/src/main/java/org/apache/ignite/client/handler/requests/tx/ClientTransactionBeginRequest.java
index 73d7266425c..b2550da6a23 100644
---
a/modules/client-handler/src/main/java/org/apache/ignite/client/handler/requests/tx/ClientTransactionBeginRequest.java
+++
b/modules/client-handler/src/main/java/org/apache/ignite/client/handler/requests/tx/ClientTransactionBeginRequest.java
@@ -18,17 +18,20 @@
package org.apache.ignite.client.handler.requests.tx;
import static
org.apache.ignite.client.handler.requests.table.ClientTableCommon.startExplicitTx;
+import static
org.apache.ignite.internal.hlc.HybridTimestamp.NULL_HYBRID_TIMESTAMP;
import java.util.concurrent.CompletableFuture;
import org.apache.ignite.client.handler.ClientHandlerMetricSource;
import org.apache.ignite.client.handler.ClientResource;
import org.apache.ignite.client.handler.ClientResourceRegistry;
+import org.apache.ignite.client.handler.NotificationSender;
import org.apache.ignite.client.handler.ResponseWriter;
import org.apache.ignite.internal.client.proto.ClientMessageUnpacker;
import org.apache.ignite.internal.hlc.HybridTimestamp;
import org.apache.ignite.internal.hlc.HybridTimestampTracker;
import org.apache.ignite.internal.lang.IgniteInternalCheckedException;
import org.apache.ignite.internal.tx.InternalTxOptions;
+import org.apache.ignite.internal.tx.TransactionKilledException;
import org.apache.ignite.internal.tx.TxManager;
/**
@@ -42,6 +45,7 @@ public class ClientTransactionBeginRequest {
* @param txManager Transactions.
* @param resources Resources.
* @param metrics Metrics.
+ * @param notificationSender The sender.
* @return Future.
*/
public static CompletableFuture<ResponseWriter> process(
@@ -49,7 +53,8 @@ public class ClientTransactionBeginRequest {
TxManager txManager,
ClientResourceRegistry resources,
ClientHandlerMetricSource metrics,
- HybridTimestampTracker tsTracker
+ HybridTimestampTracker tsTracker,
+ NotificationSender notificationSender
) throws IgniteInternalCheckedException {
boolean readOnly = in.unpackBoolean();
long timeoutMillis = in.unpackLong();
@@ -63,6 +68,12 @@ public class ClientTransactionBeginRequest {
InternalTxOptions txOptions = InternalTxOptions.builder()
.timeoutMillis(timeoutMillis)
.readTimestamp(observableTs)
+ .killClosure(notificationSender == null ? tx -> {} : tx -> {
+ // Exception will be ignored if a client doesn't support
it.
+ TransactionKilledException err = new
TransactionKilledException(tx.id(), txManager);
+
+ notificationSender.sendNotification(w -> {}, err,
NULL_HYBRID_TIMESTAMP);
+ })
.build();
var tx = startExplicitTx(tsTracker, txManager, observableTs, readOnly,
txOptions);
@@ -76,9 +87,7 @@ public class ClientTransactionBeginRequest {
long resourceId = resources.put(new ClientResource(tx,
tx::rollbackAsync));
metrics.transactionsActiveIncrement();
- return CompletableFuture.completedFuture(out -> {
- out.packLong(resourceId);
- });
+ return CompletableFuture.completedFuture(out ->
out.packLong(resourceId));
} catch (IgniteInternalCheckedException e) {
tx.rollback();
throw e;
diff --git
a/modules/client-handler/src/main/java/org/apache/ignite/client/handler/requests/tx/ClientTransactionDiscardRequest.java
b/modules/client-handler/src/main/java/org/apache/ignite/client/handler/requests/tx/ClientTransactionDiscardRequest.java
new file mode 100644
index 00000000000..e343917ea71
--- /dev/null
+++
b/modules/client-handler/src/main/java/org/apache/ignite/client/handler/requests/tx/ClientTransactionDiscardRequest.java
@@ -0,0 +1,77 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.client.handler.requests.tx;
+
+import static java.util.stream.Collectors.toList;
+
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.UUID;
+import java.util.concurrent.CompletableFuture;
+import org.apache.ignite.client.handler.ResponseWriter;
+import org.apache.ignite.internal.client.proto.ClientMessageUnpacker;
+import org.apache.ignite.internal.lang.IgniteInternalCheckedException;
+import org.apache.ignite.internal.replicator.ZonePartitionId;
+import org.apache.ignite.internal.table.IgniteTablesInternal;
+import org.apache.ignite.internal.table.TableViewInternal;
+import org.apache.ignite.internal.tx.PendingTxPartitionEnlistment;
+import org.apache.ignite.internal.tx.TxManager;
+import org.apache.ignite.internal.tx.impl.EnlistedPartitionGroup;
+
+/**
+ * Client transaction direct mapping discard request.
+ */
+public class ClientTransactionDiscardRequest {
+ /**
+ * Processes the request.
+ *
+ * @param in The unpacker.
+ * @param igniteTables Tables facade.
+ * @return The future.
+ */
+ public static CompletableFuture<ResponseWriter> process(
+ ClientMessageUnpacker in,
+ TxManager txManager,
+ IgniteTablesInternal igniteTables
+ ) throws IgniteInternalCheckedException {
+ Map<ZonePartitionId, PendingTxPartitionEnlistment> enlistedPartitions
= new HashMap<>();
+
+ UUID txId = in.unpackUuid();
+
+ int cnt = in.unpackInt(); // Number of direct enlistments.
+ for (int i = 0; i < cnt; i++) {
+ int tableId = in.unpackInt();
+ int partId = in.unpackInt();
+
+ TableViewInternal table = igniteTables.cachedTable(tableId);
+
+ if (table != null) {
+ ZonePartitionId replicationGroupId =
table.internalTable().targetReplicationGroupId(partId);
+ enlistedPartitions.computeIfAbsent(replicationGroupId, k ->
new PendingTxPartitionEnlistment(null, 0))
+ .addTableId(tableId);
+ }
+ }
+
+ List<EnlistedPartitionGroup> enlistedPartitionGroups =
enlistedPartitions.entrySet().stream()
+ .map(entry -> new EnlistedPartitionGroup(entry.getKey(),
entry.getValue().tableIds()))
+ .collect(toList());
+
+ return txManager.discardLocalWriteIntents(enlistedPartitionGroups,
txId).handle((res, err) -> null);
+ }
+}
diff --git
a/modules/client/src/main/java/org/apache/ignite/internal/client/ClientDelayedAckException.java
b/modules/client/src/main/java/org/apache/ignite/internal/client/ClientDelayedAckException.java
index ef9e06513b5..da09484db96 100644
---
a/modules/client/src/main/java/org/apache/ignite/internal/client/ClientDelayedAckException.java
+++
b/modules/client/src/main/java/org/apache/ignite/internal/client/ClientDelayedAckException.java
@@ -48,9 +48,9 @@ public class ClientDelayedAckException extends
IgniteInternalException {
}
/**
- * Gets expected schema version.
+ * Returns a related transaction id.
*
- * @return Expected schema version.
+ * @return The id.
*/
public UUID txId() {
return txId;
diff --git
a/modules/client/src/main/java/org/apache/ignite/internal/client/ClientTransactionInflights.java
b/modules/client/src/main/java/org/apache/ignite/internal/client/ClientTransactionInflights.java
index fbfeb1d9611..5a597fffb98 100644
---
a/modules/client/src/main/java/org/apache/ignite/internal/client/ClientTransactionInflights.java
+++
b/modules/client/src/main/java/org/apache/ignite/internal/client/ClientTransactionInflights.java
@@ -24,6 +24,7 @@ import static
org.apache.ignite.internal.util.CompletableFutures.nullCompletedFu
import java.util.UUID;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ConcurrentHashMap;
+import org.apache.ignite.internal.client.tx.ClientTransaction;
import org.jetbrains.annotations.Nullable;
import org.jetbrains.annotations.TestOnly;
@@ -40,12 +41,13 @@ public class ClientTransactionInflights {
/**
* Registers the inflight update for a transaction.
*
- * @param txId The transaction id.
+ * @param tx The transaction.
*/
- public void addInflight(UUID txId) {
- txCtxMap.compute(txId, (uuid, ctx) -> {
+ public void addInflight(ClientTransaction tx) {
+ txCtxMap.compute(tx.txId(), (uuid, ctx) -> {
if (ctx == null) {
ctx = new TxContext();
+ ctx.tx = tx;
}
ctx.addInflight();
@@ -125,6 +127,23 @@ public class ClientTransactionInflights {
txCtxMap.remove(uuid);
}
+ /**
+ * Returns tracked directly mapped transaction.
+ *
+ * @param id The id.
+ *
+ * @return The transaction or {@code null}.
+ */
+ public @Nullable ClientTransaction trackedTransaction(UUID id) {
+ TxContext txContext = txCtxMap.get(id);
+
+ if (txContext != null) {
+ return txContext.tx;
+ }
+
+ return null;
+ }
+
/**
* Check if the inflights map contains a given transaction.
*
@@ -142,6 +161,7 @@ public class ClientTransactionInflights {
public CompletableFuture<Void> finishFut;
public long inflights = 0;
public Throwable err;
+ public ClientTransaction tx;
void addInflight() {
inflights++;
diff --git
a/modules/client/src/main/java/org/apache/ignite/internal/client/ClientUtils.java
b/modules/client/src/main/java/org/apache/ignite/internal/client/ClientUtils.java
index f8661224bc1..3f4aaa122f7 100644
---
a/modules/client/src/main/java/org/apache/ignite/internal/client/ClientUtils.java
+++
b/modules/client/src/main/java/org/apache/ignite/internal/client/ClientUtils.java
@@ -141,6 +141,7 @@ public class ClientUtils {
case ClientOp.TX_BEGIN:
case ClientOp.TX_COMMIT:
case ClientOp.TX_ROLLBACK:
+ case ClientOp.TX_DISCARD:
return null; // Commit/rollback use owning connection and
bypass retry mechanism.
case ClientOp.JDBC_SQL_EXEC_PS_BATCH:
diff --git
a/modules/client/src/main/java/org/apache/ignite/internal/client/ReliableChannel.java
b/modules/client/src/main/java/org/apache/ignite/internal/client/ReliableChannel.java
index 036d34f43c1..3b54a00f353 100644
---
a/modules/client/src/main/java/org/apache/ignite/internal/client/ReliableChannel.java
+++
b/modules/client/src/main/java/org/apache/ignite/internal/client/ReliableChannel.java
@@ -399,7 +399,24 @@ public final class ReliableChannel implements
AutoCloseable {
}
/**
- * Get the channel.
+ * Gets the existing channel.
+ *
+ * @param nodeName Node name.
+ *
+ * @return The channel or {@code null} if connection is not available.
+ */
+ public @Nullable ClientChannel getNodeChannel(String nodeName) {
+ ClientChannelHolder holder = nodeChannelsByName.get(nodeName);
+
+ if (holder == null || holder.close) {
+ return null;
+ }
+
+ return holder.getNow();
+ }
+
+ /**
+ * Gets the channel.
*
* @param preferredNodeName Preferred node name.
*
diff --git
a/modules/client/src/main/java/org/apache/ignite/internal/client/TcpClientChannel.java
b/modules/client/src/main/java/org/apache/ignite/internal/client/TcpClientChannel.java
index 7ead14efb64..938b19a4e5a 100644
---
a/modules/client/src/main/java/org/apache/ignite/internal/client/TcpClientChannel.java
+++
b/modules/client/src/main/java/org/apache/ignite/internal/client/TcpClientChannel.java
@@ -65,6 +65,8 @@ import org.apache.ignite.internal.client.proto.HandshakeUtils;
import org.apache.ignite.internal.client.proto.ProtocolBitmaskFeature;
import org.apache.ignite.internal.client.proto.ProtocolVersion;
import org.apache.ignite.internal.client.proto.ResponseFlags;
+import org.apache.ignite.internal.client.tx.ClientTransaction;
+import org.apache.ignite.internal.client.tx.ClientTransactionKilledException;
import org.apache.ignite.internal.future.timeout.TimeoutObject;
import org.apache.ignite.internal.logger.IgniteLogger;
import org.apache.ignite.internal.properties.IgniteProductVersion;
@@ -100,7 +102,8 @@ class TcpClientChannel implements ClientChannel,
ClientMessageHandler, ClientCon
ProtocolBitmaskFeature.TX_CLIENT_GETALL_SUPPORTS_TX_OPTIONS,
ProtocolBitmaskFeature.SQL_MULTISTATEMENT_SUPPORT,
ProtocolBitmaskFeature.COMPUTE_OBSERVABLE_TS,
- ProtocolBitmaskFeature.TX_DIRECT_MAPPING_SEND_REMOTE_WRITES
+ ProtocolBitmaskFeature.TX_DIRECT_MAPPING_SEND_REMOTE_WRITES,
+ ProtocolBitmaskFeature.TX_DIRECT_MAPPING_SEND_DISCARD
));
/** Minimum supported heartbeat interval. */
@@ -603,6 +606,13 @@ class TcpClientChannel implements ClientChannel,
ClientMessageHandler, ClientCon
ClientDelayedAckException err0 =
(ClientDelayedAckException) err;
inflights.removeInflight(err0.txId(), new
TransactionException(err0.code(), err0.getMessage(), err0.getCause()));
+ } else if (err instanceof ClientTransactionKilledException) {
+ ClientTransactionKilledException err0 =
(ClientTransactionKilledException) err;
+
+ ClientTransaction tx =
inflights.trackedTransaction(err0.txId());
+ if (tx != null) {
+ tx.discardDirectMappings(true);
+ }
}
// Can't do anything to remove stuck inflight.
@@ -635,8 +645,6 @@ class TcpClientChannel implements ClientChannel,
ClientMessageHandler, ClientCon
int extSize = unpacker.tryUnpackNil() ? 0 : unpacker.unpackInt();
int expectedSchemaVersion = -1;
- long[] sqlUpdateCounters = null;
- UUID txId = null;
for (int i = 0; i < extSize; i++) {
String key = unpacker.unpackString();
@@ -644,24 +652,18 @@ class TcpClientChannel implements ClientChannel,
ClientMessageHandler, ClientCon
if (key.equals(ErrorExtensions.EXPECTED_SCHEMA_VERSION)) {
expectedSchemaVersion = unpacker.unpackInt();
} else if (key.equals(ErrorExtensions.SQL_UPDATE_COUNTERS)) {
- sqlUpdateCounters = unpacker.unpackLongArray();
+ return new SqlBatchException(traceId, code,
unpacker.unpackLongArray(),
+ errMsg != null ? errMsg : "SQL batch execution error",
causeWithStackTrace);
} else if (key.equals(ErrorExtensions.DELAYED_ACK)) {
- txId = unpacker.unpackUuid();
+ return new ClientDelayedAckException(traceId, code, errMsg,
unpacker.unpackUuid(), causeWithStackTrace);
+ } else if (key.equals(ErrorExtensions.TX_KILL)) {
+ return new ClientTransactionKilledException(traceId, code,
errMsg, unpacker.unpackUuid(), causeWithStackTrace);
} else {
// Unknown extension - ignore.
unpacker.skipValues(1);
}
}
- if (txId != null) {
- return new ClientDelayedAckException(traceId, code, errMsg, txId,
causeWithStackTrace);
- }
-
- if (sqlUpdateCounters != null) {
- errMsg = errMsg != null ? errMsg : "SQL batch execution error";
- return new SqlBatchException(traceId, code, sqlUpdateCounters,
errMsg, causeWithStackTrace);
- }
-
if (code == Table.SCHEMA_VERSION_MISMATCH_ERR) {
if (expectedSchemaVersion == -1) {
return new IgniteException(
diff --git
a/modules/client/src/main/java/org/apache/ignite/internal/client/sql/ClientSql.java
b/modules/client/src/main/java/org/apache/ignite/internal/client/sql/ClientSql.java
index 2360f0f4f95..92c368a4eea 100644
---
a/modules/client/src/main/java/org/apache/ignite/internal/client/sql/ClientSql.java
+++
b/modules/client/src/main/java/org/apache/ignite/internal/client/sql/ClientSql.java
@@ -17,12 +17,16 @@
package org.apache.ignite.internal.client.sql;
+import static java.util.concurrent.CompletableFuture.completedFuture;
+import static java.util.concurrent.CompletableFuture.failedFuture;
+import static java.util.function.Function.identity;
import static
org.apache.ignite.internal.client.proto.ProtocolBitmaskFeature.SQL_DIRECT_TX_MAPPING;
import static
org.apache.ignite.internal.client.proto.ProtocolBitmaskFeature.SQL_MULTISTATEMENT_SUPPORT;
import static
org.apache.ignite.internal.client.proto.ProtocolBitmaskFeature.SQL_PARTITION_AWARENESS;
import static
org.apache.ignite.internal.client.proto.ProtocolBitmaskFeature.TX_DELAYED_ACKS;
import static
org.apache.ignite.internal.client.proto.ProtocolBitmaskFeature.TX_DIRECT_MAPPING;
import static
org.apache.ignite.internal.client.proto.ProtocolBitmaskFeature.TX_PIGGYBACK;
+import static org.apache.ignite.internal.util.ExceptionUtils.sneakyThrow;
import static org.apache.ignite.internal.util.ExceptionUtils.unwrapCause;
import com.github.benmanes.caffeine.cache.Cache;
@@ -36,6 +40,7 @@ import java.util.Set;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.CompletionException;
import java.util.concurrent.TimeUnit;
+import java.util.function.BiFunction;
import org.apache.ignite.internal.binarytuple.BinaryTupleBuilder;
import org.apache.ignite.internal.client.ClientUtils;
import org.apache.ignite.internal.client.PartitionMapping;
@@ -147,7 +152,7 @@ public class ClientSql implements IgniteSql {
try {
return new SyncResultSetAdapter<>(executeAsync(transaction,
cancellationToken, query, arguments).join());
} catch (CompletionException e) {
- throw
ExceptionUtils.sneakyThrow(ExceptionUtils.copyExceptionWithCause(e));
+ throw sneakyThrow(ExceptionUtils.copyExceptionWithCause(e));
}
}
@@ -164,7 +169,7 @@ public class ClientSql implements IgniteSql {
try {
return new SyncResultSetAdapter<>(executeAsync(transaction,
cancellationToken, statement, arguments).join());
} catch (CompletionException e) {
- throw
ExceptionUtils.sneakyThrow(ExceptionUtils.copyExceptionWithCause(e));
+ throw sneakyThrow(ExceptionUtils.copyExceptionWithCause(e));
}
}
@@ -182,7 +187,7 @@ public class ClientSql implements IgniteSql {
try {
return new SyncResultSetAdapter<>(executeAsync(transaction,
mapper, cancellationToken, query, arguments).join());
} catch (CompletionException e) {
- throw
ExceptionUtils.sneakyThrow(ExceptionUtils.copyExceptionWithCause(e));
+ throw sneakyThrow(ExceptionUtils.copyExceptionWithCause(e));
}
}
@@ -200,7 +205,7 @@ public class ClientSql implements IgniteSql {
try {
return new SyncResultSetAdapter<>(executeAsync(transaction,
mapper, cancellationToken, statement, arguments).join());
} catch (CompletionException e) {
- throw
ExceptionUtils.sneakyThrow(ExceptionUtils.copyExceptionWithCause(e));
+ throw sneakyThrow(ExceptionUtils.copyExceptionWithCause(e));
}
}
@@ -226,7 +231,7 @@ public class ClientSql implements IgniteSql {
try {
return executeBatchAsync(transaction, cancellationToken,
dmlStatement, batch).join();
} catch (CompletionException e) {
- throw
ExceptionUtils.sneakyThrow(ExceptionUtils.copyExceptionWithCause(e));
+ throw sneakyThrow(ExceptionUtils.copyExceptionWithCause(e));
}
}
@@ -244,7 +249,7 @@ public class ClientSql implements IgniteSql {
try {
executeScriptAsync(cancellationToken, query, arguments).join();
} catch (CompletionException e) {
- throw
ExceptionUtils.sneakyThrow(ExceptionUtils.copyExceptionWithCause(e));
+ throw sneakyThrow(ExceptionUtils.copyExceptionWithCause(e));
}
}
@@ -366,7 +371,38 @@ public class ClientSql implements IgniteSql {
() -> DirectTxUtils.resolveChannel(ctx, ch,
shouldTrackOperation, tx, mapping),
null,
false
- )).exceptionally(ClientSql::handleException);
+ ).handle((BiFunction<AsyncResultSet<T>, Throwable,
CompletableFuture<AsyncResultSet<T>>>) (r, err) -> {
+ if (err != null) {
+ if (tx == null || !shouldTrackOperation) {
+ return failedFuture(err);
+ }
+
+ if (ctx.enlistmentToken != null) {
+ // In case of direct mapping error need to rollback the tx
on coordinator.
+ return tx.rollbackAsync().handle((ignored, err0) -> {
+ if (err0 != null) {
+ err.addSuppressed(err0);
+ }
+
+ sneakyThrow(err);
+ return null;
+ });
+ } else {
+ // In case of unrecoverable error the tx is already rolled
back on coordinator.
+ // Need to additionally cleanup directly mapped parts.
+ return tx.discardDirectMappings(false).handle((ignored,
err0) -> {
+ if (err0 != null) {
+ err.addSuppressed(err0);
+ }
+
+ sneakyThrow(err);
+ return null;
+ });
+ }
+ }
+
+ return completedFuture(r);
+ })).thenCompose(identity()).exceptionally(ClientSql::handleException);
}
private static @Nullable PartitionMapping resolveMapping(
@@ -404,7 +440,7 @@ public class ClientSql implements IgniteSql {
boolean sqlDirectMappingSupported =
r.clientChannel().protocolContext().isFeatureSupported(SQL_DIRECT_TX_MAPPING);
boolean sqlMultistatementsSupported =
r.clientChannel().protocolContext().allFeaturesSupported(SQL_MULTISTATEMENT_SUPPORT);
- DirectTxUtils.readTx(r, ctx, tx, ch.observableTimestamp());
+ DirectTxUtils.readTx(r, ch, ctx, tx, ch.observableTimestamp());
ClientAsyncResultSet<T> rs = new ClientAsyncResultSet<>(
r.clientChannel(), marshallers, r.in(), mapper,
tryUnpackPaMeta, sqlDirectMappingSupported, sqlMultistatementsSupported
);
@@ -556,7 +592,7 @@ public class ClientSql implements IgniteSql {
.thenCompose(tx ->
tx.channel().serviceAsync(ClientOp.SQL_EXEC_BATCH, payloadWriter,
payloadReader))
.exceptionally(ClientSql::handleException);
} catch (TransactionException e) {
- return CompletableFuture.failedFuture(new
SqlException(e.traceId(), e.code(), e.getMessage(), e));
+ return failedFuture(new SqlException(e.traceId(), e.code(),
e.getMessage(), e));
}
}
@@ -647,7 +683,7 @@ public class ClientSql implements IgniteSql {
throw new SqlException(te.traceId(), te.code(), te.getMessage(),
te);
}
- throw ExceptionUtils.sneakyThrow(ex);
+ throw sneakyThrow(ex);
}
private static class PaCacheKey {
diff --git
a/modules/client/src/main/java/org/apache/ignite/internal/client/table/ClientTable.java
b/modules/client/src/main/java/org/apache/ignite/internal/client/table/ClientTable.java
index 04167245bc6..bb499258db0 100644
---
a/modules/client/src/main/java/org/apache/ignite/internal/client/table/ClientTable.java
+++
b/modules/client/src/main/java/org/apache/ignite/internal/client/table/ClientTable.java
@@ -23,6 +23,7 @@ import static
org.apache.ignite.internal.client.proto.ProtocolBitmaskFeature.TX_
import static
org.apache.ignite.internal.client.proto.ProtocolBitmaskFeature.TX_PIGGYBACK;
import static
org.apache.ignite.internal.client.table.ClientTableMapUtils.mapAndRetry;
import static
org.apache.ignite.internal.client.table.ClientTableMapUtils.reduceWithKeepOrder;
+import static org.apache.ignite.internal.util.CompletableFutures.copyStateTo;
import static org.apache.ignite.internal.util.ExceptionUtils.sneakyThrow;
import static org.apache.ignite.internal.util.ExceptionUtils.unwrapCause;
import static org.apache.ignite.lang.ErrorGroups.Common.INTERNAL_ERR;
@@ -504,21 +505,21 @@ public class ClientTable implements Table {
.thenCompose(t -> loadSchemaAndReadData(t,
reader))
.handle((ret, ex) -> {
if (ex != null) {
- // Retry schema errors, if any.
Throwable cause = ex;
if (ctx.firstReqFut != null) {
// Create failed transaction.
- ClientTransaction failed = new
ClientTransaction(ctx.channel, id, ctx.readOnly, null, ctx.pm,
- null,
ch.observableTimestamp(), 0);
+ ClientTransaction failed = new
ClientTransaction(ctx.channel, ch, id, ctx.readOnly, null,
+ ctx.pm, null,
ch.observableTimestamp(), 0);
failed.fail();
ctx.firstReqFut.complete(failed);
+ // Txn was not started, rollback
is not required.
fut.completeExceptionally(unwrapCause(ex));
return null;
}
- // Don't attempt retrying in case of
direct mapping. This may be improved in the future.
if (ctx.enlistmentToken == null) {
+ // Retry schema errors, if any, in
proxy mode.
while (cause != null) {
if (cause instanceof
ClientSchemaVersionMismatchException) {
// Retry with specific
schema version.
@@ -527,13 +528,7 @@ public class ClientTable implements Table {
doSchemaOutInOpAsync(opCode, writer, reader, defaultValue,
responseSchemaRequired,
provider,
retryPolicyOverride, expectedVersion, expectNotifications, tx)
-
.whenComplete((res0, err0) -> {
- if (err0 !=
null) {
-
fut.completeExceptionally(err0);
- } else {
-
fut.complete(res0);
- }
- });
+
.whenComplete(copyStateTo(fut));
return null;
} else if
(schemaVersionOverride == null && cause instanceof UnmappedColumnsException) {
@@ -544,13 +539,7 @@ public class ClientTable implements Table {
doSchemaOutInOpAsync(opCode, writer, reader, defaultValue,
responseSchemaRequired,
provider,
retryPolicyOverride, UNKNOWN_SCHEMA_VERSION, expectNotifications, tx)
-
.whenComplete((res0, err0) -> {
- if (err0 !=
null) {
-
fut.completeExceptionally(err0);
- } else {
-
fut.complete(res0);
- }
- });
+
.whenComplete(copyStateTo(fut));
return null;
}
@@ -558,9 +547,23 @@ public class ClientTable implements Table {
cause = cause.getCause();
}
- fut.completeExceptionally(ex);
+ if (tx0 == null) {
+ fut.completeExceptionally(ex);
+ } else {
+ // In case of unrecoverable
error the tx is already rolled back on coordinator.
+ // We need to additionally
cleanup directly mapped parts.
+
tx0.discardDirectMappings(false).handle((ignored, err0) -> {
+ if (err0 != null) {
+ ex.addSuppressed(err0);
+ }
+
+
fut.completeExceptionally(ex);
+
+ return (T) null;
+ });
+ }
} else {
- // In case of direct mapping
failure for any reason try to roll back the transaction.
+ // In case of direct mapping error
we need to rollback the tx on coordinator.
tx0.rollbackAsync().handle((ignored, err0) -> {
if (err0 != null) {
ex.addSuppressed(err0);
@@ -597,7 +600,7 @@ public class ClientTable implements Table {
@Nullable ClientTransaction tx0
) {
ClientMessageUnpacker in1 = in.in();
- DirectTxUtils.readTx(in, ctx, tx0, ch.observableTimestamp());
+ DirectTxUtils.readTx(in, ch, ctx, tx0, ch.observableTimestamp());
int schemaVer = in1.unpackInt();
diff --git
a/modules/client/src/main/java/org/apache/ignite/internal/client/tx/ClientLazyTransaction.java
b/modules/client/src/main/java/org/apache/ignite/internal/client/tx/ClientLazyTransaction.java
index 46ddef2f9bc..1b2d70f3a21 100644
---
a/modules/client/src/main/java/org/apache/ignite/internal/client/tx/ClientLazyTransaction.java
+++
b/modules/client/src/main/java/org/apache/ignite/internal/client/tx/ClientLazyTransaction.java
@@ -28,6 +28,8 @@ import org.apache.ignite.internal.client.ReliableChannel;
import org.apache.ignite.internal.client.proto.tx.ClientInternalTxOptions;
import org.apache.ignite.internal.hlc.HybridTimestampTracker;
import org.apache.ignite.internal.lang.IgniteBiTuple;
+import org.apache.ignite.internal.tostring.S;
+import org.apache.ignite.internal.util.ViewUtils;
import org.apache.ignite.tx.Transaction;
import org.apache.ignite.tx.TransactionException;
import org.apache.ignite.tx.TransactionOptions;
@@ -68,14 +70,7 @@ public class ClientLazyTransaction implements Transaction {
@Override
public void commit() throws TransactionException {
- var tx0 = tx;
-
- if (tx0 == null) {
- // No operations were performed, nothing to commit.
- return;
- }
-
- tx0.join().commit();
+ ViewUtils.sync(commitAsync());
}
@Override
@@ -92,14 +87,7 @@ public class ClientLazyTransaction implements Transaction {
@Override
public void rollback() throws TransactionException {
- var tx0 = tx;
-
- if (tx0 == null) {
- // No operations were performed, nothing to rollback.
- return;
- }
-
- tx0.join().rollback();
+ ViewUtils.sync(rollbackAsync());
}
@Override
@@ -228,4 +216,9 @@ public class ClientLazyTransaction implements Transaction {
@Nullable EnumSet<ClientInternalTxOptions> options() {
return txOptions;
}
+
+ @Override
+ public String toString() {
+ return S.toString(this);
+ }
}
diff --git
a/modules/client/src/main/java/org/apache/ignite/internal/client/tx/ClientTransaction.java
b/modules/client/src/main/java/org/apache/ignite/internal/client/tx/ClientTransaction.java
index d1c31601921..414ef106c4e 100644
---
a/modules/client/src/main/java/org/apache/ignite/internal/client/tx/ClientTransaction.java
+++
b/modules/client/src/main/java/org/apache/ignite/internal/client/tx/ClientTransaction.java
@@ -18,20 +18,25 @@
package org.apache.ignite.internal.client.tx;
import static java.util.function.Function.identity;
+import static
org.apache.ignite.internal.client.proto.ProtocolBitmaskFeature.TX_DIRECT_MAPPING_SEND_DISCARD;
import static
org.apache.ignite.internal.client.proto.ProtocolBitmaskFeature.TX_PIGGYBACK;
import static org.apache.ignite.internal.lang.IgniteStringFormatter.format;
+import static org.apache.ignite.internal.util.CompletableFutures.allOf;
import static
org.apache.ignite.internal.util.CompletableFutures.nullCompletedFuture;
import static org.apache.ignite.internal.util.ViewUtils.sync;
import static org.apache.ignite.lang.ErrorGroups.Common.INTERNAL_ERR;
import static
org.apache.ignite.lang.ErrorGroups.Transactions.TX_ALREADY_FINISHED_ERR;
+import static org.apache.ignite.lang.ErrorGroups.Transactions.TX_KILLED_ERR;
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.List;
import java.util.Map;
import java.util.Map.Entry;
import java.util.UUID;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.atomic.AtomicInteger;
-import java.util.concurrent.atomic.AtomicReference;
import java.util.concurrent.locks.ReentrantReadWriteLock;
import org.apache.ignite.internal.client.ClientChannel;
import org.apache.ignite.internal.client.PartitionMapping;
@@ -43,6 +48,7 @@ import org.apache.ignite.internal.lang.IgniteBiTuple;
import org.apache.ignite.internal.replicator.TablePartitionId;
import org.apache.ignite.internal.tostring.IgniteToStringExclude;
import org.apache.ignite.internal.tostring.S;
+import org.apache.ignite.internal.util.ExceptionUtils;
import org.apache.ignite.lang.IgniteException;
import org.apache.ignite.tx.Transaction;
import org.apache.ignite.tx.TransactionException;
@@ -66,6 +72,9 @@ public class ClientTransaction implements Transaction {
/** Rolled back state. */
private static final int STATE_ROLLED_BACK = 2;
+ /** Kill state. */
+ private static final int STATE_KILLED = 3;
+
/** Channel that the transaction belongs to. */
@IgniteToStringExclude
private final ClientChannel ch;
@@ -75,7 +84,7 @@ public class ClientTransaction implements Transaction {
/** The future used on repeated commit/rollback. */
@IgniteToStringExclude
- private final AtomicReference<CompletableFuture<Void>> finishFut = new
AtomicReference<>();
+ private volatile CompletableFuture<Void> finishFut;
/** State. */
private final AtomicInteger state = new AtomicInteger(STATE_OPEN);
@@ -105,10 +114,13 @@ public class ClientTransaction implements Transaction {
@IgniteToStringExclude
private final ReentrantReadWriteLock enlistPartitionLock = new
ReentrantReadWriteLock();
+ private final ReliableChannel reliableChannel;
+
/**
* Constructor.
*
- * @param ch Channel that the transaction belongs to.
+ * @param ch Channel that the transaction belongs to (coordinator
connection).
+ * @param reliableChannel Channels repository.
* @param id Transaction id.
* @param isReadOnly Read-only flag.
* @param txId Transaction id.
@@ -119,6 +131,7 @@ public class ClientTransaction implements Transaction {
*/
public ClientTransaction(
ClientChannel ch,
+ ReliableChannel reliableChannel,
long id,
boolean isReadOnly,
UUID txId,
@@ -128,6 +141,7 @@ public class ClientTransaction implements Transaction {
long timeout
) {
this.ch = ch;
+ this.reliableChannel = reliableChannel;
this.id = id;
this.isReadOnly = isReadOnly;
this.txId = txId;
@@ -206,14 +220,92 @@ public class ClientTransaction implements Transaction {
sync(commitAsync());
}
+ /**
+ * Discards the directly mapped transaction fragments in case of
coordinator side transaction invalidation
+ * (either kill or implicit rollback due to mapping failure, see
postEnlist).
+ *
+ * @param killed Killed flag.
+ *
+ * @return The future.
+ */
+ public CompletableFuture<Void> discardDirectMappings(boolean killed) {
+ enlistPartitionLock.writeLock().lock();
+
+ try {
+ if (finishFut != null) {
+ return finishFut;
+ } else {
+ finishFut = new CompletableFuture<>();
+ }
+ } finally {
+ enlistPartitionLock.writeLock().unlock();
+ }
+
+ return sendDiscardRequests().handle((r, e) -> {
+ setState(killed ? STATE_KILLED : STATE_ROLLED_BACK);
+ ch.inflights().erase(txId());
+ this.finishFut.complete(null);
+ return null;
+ });
+ }
+
+ private CompletableFuture<Void> sendDiscardRequests() {
+ assert finishFut != null;
+
+ if
(!ch.protocolContext().isFeatureSupported(TX_DIRECT_MAPPING_SEND_DISCARD)) {
+ return nullCompletedFuture();
+ }
+
+ Map<String, List<TablePartitionId>> enlistments = new HashMap<>();
+
+ for (Entry<TablePartitionId, CompletableFuture<IgniteBiTuple<String,
Long>>> entry : enlisted.entrySet()) {
+ IgniteBiTuple<String, Long> info = entry.getValue().getNow(null);
+
+ if (info == null) {
+ continue; // Ignore incomplete enlistments.
+ }
+
+ enlistments.computeIfAbsent(info.get1(), k -> new
ArrayList<>()).add(entry.getKey());
+ }
+
+ List<CompletableFuture<Void>> futures = new
ArrayList<>(enlistments.size());
+
+ for (Entry<String, List<TablePartitionId>> entry :
enlistments.entrySet()) {
+ ClientChannel ch = reliableChannel.getNodeChannel(entry.getKey());
+
+ if (ch == null) {
+ // Connection is lost, the transaction will be cleaned up by
other means.
+ // TODO https://issues.apache.org/jira/browse/IGNITE-27651
+ continue;
+ }
+
+ CompletableFuture<Void> discardFut =
ch.serviceAsync(ClientOp.TX_DISCARD, w -> {
+ int cnt = entry.getValue().size();
+ w.out().packUuid(txId);
+ w.out().packInt(cnt);
+
+ for (int i = 0; i < cnt; i++) {
+ w.out().packInt(entry.getValue().get(i).tableId());
+ w.out().packInt(entry.getValue().get(i).partitionId());
+ }
+ }, null);
+
+ futures.add(discardFut);
+ }
+
+ return allOf(futures);
+ }
+
/** {@inheritDoc} */
@Override
public CompletableFuture<Void> commitAsync() {
enlistPartitionLock.writeLock().lock();
try {
- if (!finishFut.compareAndSet(null, new CompletableFuture<>())) {
- return finishFut.get();
+ if (finishFut != null) {
+ return finishFut;
+ } else {
+ finishFut = new CompletableFuture<>();
}
} finally {
enlistPartitionLock.writeLock().unlock();
@@ -245,9 +337,23 @@ public class ClientTransaction implements Transaction {
}).thenCompose(identity());
mainFinishFut.handle((res, e) -> {
+ if (e != null) {
+ // Failed to commit for some reason, need to discard direct
mappings.
+ Throwable cause = ExceptionUtils.unwrapCause(e);
+
+ sendDiscardRequests().handle((r, e0) -> {
+ setState(cause instanceof ClientTransactionKilledException
? STATE_KILLED : STATE_ROLLED_BACK);
+ ch.inflights().erase(txId());
+ this.finishFut.complete(null);
+ return null;
+ });
+
+ return null;
+ }
+
setState(STATE_COMMITTED);
ch.inflights().erase(txId());
- this.finishFut.get().complete(null);
+ this.finishFut.complete(null);
return null;
});
@@ -266,8 +372,10 @@ public class ClientTransaction implements Transaction {
enlistPartitionLock.writeLock().lock();
try {
- if (!finishFut.compareAndSet(null, new CompletableFuture<>())) {
- return finishFut.get();
+ if (finishFut != null) {
+ return finishFut;
+ } else {
+ finishFut = new CompletableFuture<>();
}
} finally {
enlistPartitionLock.writeLock().unlock();
@@ -285,7 +393,7 @@ public class ClientTransaction implements Transaction {
mainFinishFut.handle((res, e) -> {
setState(STATE_ROLLED_BACK);
ch.inflights().erase(txId());
- this.finishFut.get().complete(null);
+ this.finishFut.complete(null);
return null;
});
@@ -346,9 +454,22 @@ public class ClientTransaction implements Transaction {
return clientTx;
}
- throw new TransactionException(
- TX_ALREADY_FINISHED_ERR,
- format("Transaction is already finished [tx={}].", clientTx));
+ throw exceptionForState(state, clientTx);
+ }
+
+ private static TransactionException exceptionForState(int state,
ClientTransaction clientTx) {
+ assert state > STATE_OPEN;
+
+ if (state == STATE_KILLED) {
+ return new TransactionException(
+ TX_KILLED_ERR,
+ format("Transaction is killed [tx={}].", clientTx));
+ } else {
+ return new TransactionException(
+ TX_ALREADY_FINISHED_ERR,
+ format("Transaction is already finished [tx={},
committed={}].", clientTx,
+ state == STATE_COMMITTED ? "true" : "false"));
+ }
}
static IgniteException unsupportedTxTypeException(Transaction tx) {
@@ -362,8 +483,8 @@ public class ClientTransaction implements Transaction {
}
private void checkEnlistPossible() {
- if (finishFut.get() != null) {
- throw new TransactionException(TX_ALREADY_FINISHED_ERR,
format("Transaction is already finished [tx={}].", this));
+ if (finishFut != null) {
+ throw exceptionForState(state.get(), this);
}
}
@@ -379,39 +500,36 @@ public class ClientTransaction implements Transaction {
*/
public CompletableFuture<IgniteBiTuple<String, Long>>
enlistFuture(ReliableChannel ch, ClientChannel opChannel, PartitionMapping pm,
boolean trackOperation) {
- if (!enlistPartitionLock.readLock().tryLock()) {
- throw new TransactionException(TX_ALREADY_FINISHED_ERR,
format("Transaction is already finished [tx={}].", this));
- }
+ enlistPartitionLock.readLock().lock();
- checkEnlistPossible();
-
- boolean[] first = {false};
-
- TablePartitionId tablePartitionId = new TablePartitionId(pm.tableId(),
pm.partition());
+ try {
+ checkEnlistPossible();
- CompletableFuture<IgniteBiTuple<String, Long>> fut =
enlisted.compute(tablePartitionId, (k, v) -> {
- if (v == null) {
- first[0] = true;
- return new CompletableFuture<>();
- } else {
- return v;
- }
- });
+ boolean[] first = {false};
- enlistPartitionLock.readLock().unlock();
+ TablePartitionId tablePartitionId = new
TablePartitionId(pm.tableId(), pm.partition());
- // Re-check after unlock.
- checkEnlistPossible();
+ CompletableFuture<IgniteBiTuple<String, Long>> fut =
enlisted.compute(tablePartitionId, (k, v) -> {
+ if (v == null) {
+ first[0] = true;
+ return new CompletableFuture<>();
+ } else {
+ return v;
+ }
+ });
- if (trackOperation) {
- ch.inflights().addInflight(txId);
- }
+ if (trackOperation) {
+ ch.inflights().addInflight(this);
+ }
- if (first[0]) {
- // For the first request return completed future.
- return CompletableFuture.completedFuture(new IgniteBiTuple<>(null,
null));
- } else {
- return fut;
+ if (first[0]) {
+ // For the first request return completed future.
+ return CompletableFuture.completedFuture(new
IgniteBiTuple<>(null, null));
+ } else {
+ return fut;
+ }
+ } finally {
+ enlistPartitionLock.readLock().unlock();
}
}
@@ -464,11 +582,20 @@ public class ClientTransaction implements Transaction {
/** Fail the transaction. */
public void fail() {
state.set(STATE_ROLLED_BACK);
- finishFut.set(nullCompletedFuture());
+ finishFut = nullCompletedFuture();
}
@Override
public String toString() {
return S.toString(this);
}
+
+ /**
+ * Returns a killed state.
+ *
+ * @return The value.
+ */
+ public boolean killed() {
+ return state.get() == STATE_KILLED;
+ }
}
diff --git
a/modules/client/src/main/java/org/apache/ignite/internal/client/ClientDelayedAckException.java
b/modules/client/src/main/java/org/apache/ignite/internal/client/tx/ClientTransactionKilledException.java
similarity index 55%
copy from
modules/client/src/main/java/org/apache/ignite/internal/client/ClientDelayedAckException.java
copy to
modules/client/src/main/java/org/apache/ignite/internal/client/tx/ClientTransactionKilledException.java
index ef9e06513b5..bbbe3eee7fe 100644
---
a/modules/client/src/main/java/org/apache/ignite/internal/client/ClientDelayedAckException.java
+++
b/modules/client/src/main/java/org/apache/ignite/internal/client/tx/ClientTransactionKilledException.java
@@ -15,16 +15,16 @@
* limitations under the License.
*/
-package org.apache.ignite.internal.client;
+package org.apache.ignite.internal.client.tx;
import java.util.UUID;
-import org.apache.ignite.internal.lang.IgniteInternalException;
+import org.apache.ignite.tx.TransactionException;
import org.jetbrains.annotations.Nullable;
/**
- * Holds the transaction id and the cause for delayed replication ack failure.
+ * Reports a killed transaction.
*/
-public class ClientDelayedAckException extends IgniteInternalException {
+public class ClientTransactionKilledException extends TransactionException {
/** Serial version uid. */
private static final long serialVersionUID = 0L;
@@ -36,21 +36,38 @@ public class ClientDelayedAckException extends
IgniteInternalException {
*
* @param traceId Trace ID.
* @param code Error code.
- *
* @param message String message.
* @param txId Related transaction id.
- * @param cause Cause.
+ * @param cause The cause.
*/
- ClientDelayedAckException(UUID traceId, int code, @Nullable String
message, UUID txId, @Nullable Throwable cause) {
+ public ClientTransactionKilledException(UUID traceId, int code, @Nullable
String message, UUID txId, @Nullable Throwable cause) {
super(traceId, code, message, cause);
this.txId = txId;
}
/**
- * Gets expected schema version.
+ * Constructor (for copying purposes).
+ *
+ * @param traceId Trace ID.
+ * @param code Error code.
+ * @param message String message.
+ * @param cause The cause.
+ */
+ public ClientTransactionKilledException(UUID traceId, int code, @Nullable
String message, @Nullable Throwable cause) {
+ super(traceId, code, message, cause);
+
+ if (cause instanceof ClientTransactionKilledException) {
+ this.txId = ((ClientTransactionKilledException) cause).txId;
+ } else {
+ throw new IllegalArgumentException("Copy constructor should only
be called with the source exception");
+ }
+ }
+
+ /**
+ * Returns a related transaction id.
*
- * @return Expected schema version.
+ * @return The id.
*/
public UUID txId() {
return txId;
diff --git
a/modules/client/src/main/java/org/apache/ignite/internal/client/tx/ClientTransactions.java
b/modules/client/src/main/java/org/apache/ignite/internal/client/tx/ClientTransactions.java
index 71557894d5c..cf445a80cb9 100644
---
a/modules/client/src/main/java/org/apache/ignite/internal/client/tx/ClientTransactions.java
+++
b/modules/client/src/main/java/org/apache/ignite/internal/client/tx/ClientTransactions.java
@@ -90,7 +90,7 @@ public class ClientTransactions implements IgniteTransactions
{
w.out().packLong(timeout);
w.out().packLong(observableTimestamp);
},
- r -> readTx(r, readOnly, timeout),
+ r -> readTx(r, ch, readOnly, timeout),
channelResolver,
null,
false);
@@ -98,6 +98,7 @@ public class ClientTransactions implements IgniteTransactions
{
private static ClientTransaction readTx(
PayloadInputChannel r,
+ ReliableChannel ch,
boolean isReadOnly,
long timeout
) {
@@ -105,6 +106,6 @@ public class ClientTransactions implements
IgniteTransactions {
long id = in.unpackLong();
- return new ClientTransaction(r.clientChannel(), id, isReadOnly, EMPTY,
null, EMPTY, null, timeout);
+ return new ClientTransaction(r.clientChannel(), ch, id, isReadOnly,
EMPTY, null, EMPTY, null, timeout);
}
}
diff --git
a/modules/client/src/main/java/org/apache/ignite/internal/client/tx/DirectTxUtils.java
b/modules/client/src/main/java/org/apache/ignite/internal/client/tx/DirectTxUtils.java
index 51e4a53941c..8e0ea9085a7 100644
---
a/modules/client/src/main/java/org/apache/ignite/internal/client/tx/DirectTxUtils.java
+++
b/modules/client/src/main/java/org/apache/ignite/internal/client/tx/DirectTxUtils.java
@@ -92,8 +92,8 @@ public class DirectTxUtils {
IgniteBiTuple<CompletableFuture<ClientTransaction>, Boolean> tuple
= ClientLazyTransaction.ensureStarted(tx, ch,
piggybackSupported.test(ch0) ? null : () ->
completedFuture(ch0));
- // If this is the first direct request in transaction, it will
also piggyback a transaction start.
if (tuple.get2()) {
+ // If this is the first direct request in transaction, it will
also piggyback a transaction start.
ctx.pm = pm;
ctx.readOnly = tx.isReadOnly();
ctx.channel = ch0;
@@ -186,12 +186,14 @@ public class DirectTxUtils {
* ensuring the transaction state is correctly initialized or synchronized
with the server.
*
* @param payloadChannel The {@link PayloadInputChannel} containing the
server's response data.
+ * @param ch Channels repository.
* @param ctx The {@link WriteContext} holding the transaction request
state and response future.
* @param tx The current {@link ClientTransaction}, or {@code null} if
piggybacking a new transaction.
* @param observableTimestamp A tracker for observable timestamps used for
transaction visibility and causality.
*/
public static void readTx(
PayloadInputChannel payloadChannel,
+ ReliableChannel ch,
WriteContext ctx,
@Nullable ClientTransaction tx,
HybridTimestampTracker observableTimestamp
@@ -206,7 +208,7 @@ public class DirectTxUtils {
long timeout = in.unpackLong();
ClientTransaction startedTx =
- new ClientTransaction(payloadChannel.clientChannel(), id,
ctx.readOnly, txId, ctx.pm, coordId, observableTimestamp,
+ new ClientTransaction(payloadChannel.clientChannel(), ch,
id, ctx.readOnly, txId, ctx.pm, coordId, observableTimestamp,
timeout);
ctx.firstReqFut.complete(startedTx);
diff --git
a/modules/client/src/test/java/org/apache/ignite/client/ClientMetricsTest.java
b/modules/client/src/test/java/org/apache/ignite/client/ClientMetricsTest.java
index 24f8a114d34..41422194096 100644
---
a/modules/client/src/test/java/org/apache/ignite/client/ClientMetricsTest.java
+++
b/modules/client/src/test/java/org/apache/ignite/client/ClientMetricsTest.java
@@ -263,14 +263,14 @@ public class ClientMetricsTest extends
BaseIgniteAbstractTest {
server = AbstractClientTest.startServer(1000, new FakeIgnite());
client = clientBuilder().build();
- assertEquals(17, metrics().bytesSent());
+ assertEquals(18, metrics().bytesSent());
long handshakeReceived = metrics().bytesReceived();
assertThat(handshakeReceived, greaterThanOrEqualTo(77L));
client.tables().tables();
- assertEquals(23, metrics().bytesSent());
+ assertEquals(24, metrics().bytesSent());
assertEquals(handshakeReceived + 21, metrics().bytesReceived());
}
diff --git
a/modules/client/src/test/java/org/apache/ignite/client/ClientTransactionInflightTest.java
b/modules/client/src/test/java/org/apache/ignite/client/ClientTransactionInflightTest.java
index 2590cc1911e..3dc17f6c1f7 100644
---
a/modules/client/src/test/java/org/apache/ignite/client/ClientTransactionInflightTest.java
+++
b/modules/client/src/test/java/org/apache/ignite/client/ClientTransactionInflightTest.java
@@ -23,24 +23,35 @@ import static org.junit.jupiter.api.Assertions.assertEquals;
import static org.junit.jupiter.api.Assertions.assertFalse;
import static org.junit.jupiter.api.Assertions.assertThrows;
import static org.junit.jupiter.api.Assertions.assertTrue;
+import static org.mockito.Mockito.mock;
import java.util.UUID;
import java.util.concurrent.CompletableFuture;
import org.apache.ignite.internal.client.ClientTransactionInflights;
+import org.apache.ignite.internal.client.tx.ClientTransaction;
+import org.apache.ignite.internal.testframework.BaseIgniteAbstractTest;
import
org.apache.ignite.internal.testframework.matchers.CompletableFutureExceptionMatcher;
import org.junit.jupiter.api.Test;
+import org.mockito.Mockito;
/**
* Tests inflight tracker.
*/
-public class ClientTransactionInflightTest {
+public class ClientTransactionInflightTest extends BaseIgniteAbstractTest {
private final UUID txId = UUID.randomUUID();
private final ClientTransactionInflights inflights = new
ClientTransactionInflights();
+ private final ClientTransaction transaction;
+
+ {
+ transaction = mock(ClientTransaction.class);
+ Mockito.when(transaction.txId()).thenReturn(txId);
+ }
+
@Test
public void testState1() {
- inflights.addInflight(txId);
+ inflights.addInflight(transaction);
assertEquals(1, inflights.map().get(txId).inflights);
}
@@ -52,7 +63,7 @@ public class ClientTransactionInflightTest {
@Test
public void testState3() {
- inflights.addInflight(txId);
+ inflights.addInflight(transaction);
inflights.removeInflight(txId, null);
CompletableFuture<Void> fut = inflights.finishFuture(txId);
@@ -61,7 +72,7 @@ public class ClientTransactionInflightTest {
@Test
public void testState4() {
- inflights.addInflight(txId);
+ inflights.addInflight(transaction);
CompletableFuture<Void> fut = inflights.finishFuture(txId);
assertFalse(fut.isDone());
@@ -73,7 +84,7 @@ public class ClientTransactionInflightTest {
@Test
public void testState5() {
- inflights.addInflight(txId);
+ inflights.addInflight(transaction);
CompletableFuture<Void> fut = inflights.finishFuture(txId);
assertFalse(fut.isDone());
@@ -85,7 +96,7 @@ public class ClientTransactionInflightTest {
@Test
public void testState6() {
- inflights.addInflight(txId);
+ inflights.addInflight(transaction);
inflights.removeInflight(txId, new TestException());
CompletableFuture<Void> fut = inflights.finishFuture(txId);
diff --git
a/modules/client/src/test/java/org/apache/ignite/client/RetryPolicyTest.java
b/modules/client/src/test/java/org/apache/ignite/client/RetryPolicyTest.java
index be5ca44560b..8b77025a8ee 100644
--- a/modules/client/src/test/java/org/apache/ignite/client/RetryPolicyTest.java
+++ b/modules/client/src/test/java/org/apache/ignite/client/RetryPolicyTest.java
@@ -235,7 +235,7 @@ public class RetryPolicyTest extends BaseIgniteAbstractTest
{
}
}
- long expectedNullCount = 24;
+ long expectedNullCount = 25;
String msg = nullOpFields.size()
+ " operation codes do not have public equivalent. When adding
new codes, update ClientOperationType too. Missing ops: "
diff --git
a/modules/client/src/test/java/org/apache/ignite/client/fakes/FakeTxManager.java
b/modules/client/src/test/java/org/apache/ignite/client/fakes/FakeTxManager.java
index 4955eb80d04..938db2fd479 100644
---
a/modules/client/src/test/java/org/apache/ignite/client/fakes/FakeTxManager.java
+++
b/modules/client/src/test/java/org/apache/ignite/client/fakes/FakeTxManager.java
@@ -21,6 +21,7 @@ import static
java.util.concurrent.CompletableFuture.completedFuture;
import static
org.apache.ignite.internal.util.CompletableFutures.nullCompletedFuture;
import java.util.Collection;
+import java.util.List;
import java.util.Map;
import java.util.UUID;
import java.util.concurrent.CompletableFuture;
@@ -253,7 +254,7 @@ public class FakeTxManager implements TxManager {
@Override
public CompletableFuture<Void> cleanup(
ZonePartitionId commitPartitionId,
- Map<ZonePartitionId, PartitionEnlistment> enlistedPartitions,
+ Map<ZonePartitionId, ? extends PartitionEnlistment>
enlistedPartitions,
boolean commit,
@Nullable HybridTimestamp commitTimestamp,
UUID txId
@@ -287,6 +288,11 @@ public class FakeTxManager implements TxManager {
return nullCompletedFuture();
}
+ @Override
+ public CompletableFuture<Void>
discardLocalWriteIntents(List<EnlistedPartitionGroup> groups, UUID txId) {
+ return nullCompletedFuture();
+ }
+
@Override
public int lockRetryCount() {
return 0;
diff --git
a/modules/client/src/test/java/org/apache/ignite/internal/client/RepeatedFinishClientTransactionTest.java
b/modules/client/src/test/java/org/apache/ignite/internal/client/RepeatedFinishClientTransactionTest.java
index e1b50698efd..5d750fa5fa0 100644
---
a/modules/client/src/test/java/org/apache/ignite/internal/client/RepeatedFinishClientTransactionTest.java
+++
b/modules/client/src/test/java/org/apache/ignite/internal/client/RepeatedFinishClientTransactionTest.java
@@ -28,6 +28,7 @@ import static org.junit.jupiter.api.Assertions.assertNotSame;
import static org.junit.jupiter.api.Assertions.assertSame;
import static org.junit.jupiter.api.Assertions.assertTrue;
import static org.junit.jupiter.api.Assertions.fail;
+import static org.junit.jupiter.params.provider.Arguments.argumentSet;
import static org.mockito.ArgumentMatchers.any;
import static org.mockito.ArgumentMatchers.anyInt;
import static org.mockito.Mockito.mock;
@@ -36,12 +37,17 @@ import static org.mockito.Mockito.when;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.TimeUnit;
+import java.util.function.Consumer;
+import java.util.stream.Stream;
import org.apache.ignite.internal.client.proto.ClientOp;
import org.apache.ignite.internal.client.tx.ClientTransaction;
import org.apache.ignite.internal.testframework.BaseIgniteAbstractTest;
import org.apache.ignite.lang.ErrorGroups.Transactions;
import org.apache.ignite.tx.TransactionException;
import org.junit.jupiter.api.Test;
+import org.junit.jupiter.params.ParameterizedTest;
+import org.junit.jupiter.params.provider.Arguments;
+import org.junit.jupiter.params.provider.MethodSource;
import org.mockito.Mockito;
/**
@@ -63,7 +69,7 @@ public class RepeatedFinishClientTransactionTest extends
BaseIgniteAbstractTest
}
};
- ClientTransaction tx = new ClientTransaction(clientChannel, 1, false,
randomUUID(), null, randomUUID(), EMPTY_TS_PROVIDER, 0);
+ ClientTransaction tx = new ClientTransaction(clientChannel, null, 1,
false, randomUUID(), null, randomUUID(), EMPTY_TS_PROVIDER, 0);
CompletableFuture<Object> fut = new CompletableFuture<>();
@@ -109,7 +115,7 @@ public class RepeatedFinishClientTransactionTest extends
BaseIgniteAbstractTest
}
};
- ClientTransaction tx = new ClientTransaction(clientChannel, 1, false,
randomUUID(), null, randomUUID(), EMPTY_TS_PROVIDER, 0);
+ ClientTransaction tx = new ClientTransaction(clientChannel, null, 1,
false, randomUUID(), null, randomUUID(), EMPTY_TS_PROVIDER, 0);
CompletableFuture<Object> fut = new CompletableFuture<>();
@@ -151,7 +157,7 @@ public class RepeatedFinishClientTransactionTest extends
BaseIgniteAbstractTest
when(clientChannel.protocolContext()).thenReturn(ctx);
when(clientChannel.serviceAsync(anyInt(), any(),
any())).thenReturn(failedFuture(new Exception("Expected exception.")));
- ClientTransaction tx = new ClientTransaction(clientChannel, 1, false,
randomUUID(), null, randomUUID(), EMPTY_TS_PROVIDER, 0);
+ ClientTransaction tx = new ClientTransaction(clientChannel, null, 1,
false, randomUUID(), null, randomUUID(), EMPTY_TS_PROVIDER, 0);
CompletableFuture<Object> fut = new CompletableFuture<>();
@@ -182,7 +188,7 @@ public class RepeatedFinishClientTransactionTest extends
BaseIgniteAbstractTest
when(clientChannel.protocolContext()).thenReturn(ctx);
when(clientChannel.serviceAsync(anyInt(), any(),
any())).thenReturn(failedFuture(new Exception("Expected exception.")));
- ClientTransaction tx = new ClientTransaction(clientChannel, 1, false,
randomUUID(), null, randomUUID(), EMPTY_TS_PROVIDER, 0);
+ ClientTransaction tx = new ClientTransaction(clientChannel, null, 1,
false, randomUUID(), null, randomUUID(), EMPTY_TS_PROVIDER, 0);
CompletableFuture<Object> fut = new CompletableFuture<>();
@@ -217,7 +223,7 @@ public class RepeatedFinishClientTransactionTest extends
BaseIgniteAbstractTest
PartitionMapping pm = new PartitionMapping(1, "test", 1);
- ClientTransaction tx = new ClientTransaction(clientChannel, 1, false,
randomUUID(), pm, randomUUID(), EMPTY_TS_PROVIDER, 0);
+ ClientTransaction tx = new ClientTransaction(clientChannel, ch, 1,
false, randomUUID(), pm, randomUUID(), EMPTY_TS_PROVIDER, 0);
tx.commit();
@@ -233,8 +239,16 @@ public class RepeatedFinishClientTransactionTest extends
BaseIgniteAbstractTest
}
}
- @Test
- public void testEnlistFailAfterRollback() {
+ private static Stream<Arguments> rollbackClosureFactory() {
+ return Stream.of(
+ argumentSet("rollback", (Consumer<ClientTransaction>)
ClientTransaction::rollback),
+ argumentSet("discard", (Consumer<ClientTransaction>)
clientTransaction -> clientTransaction.discardDirectMappings(false))
+ );
+ }
+
+ @ParameterizedTest
+ @MethodSource("rollbackClosureFactory")
+ public void testEnlistFailAfterRollback(Consumer<ClientTransaction>
rollbackClo) {
ReliableChannel ch = mock(ReliableChannel.class,
Mockito.RETURNS_DEEP_STUBS);
TestClientChannel clientChannel = mock(TestClientChannel.class,
Mockito.RETURNS_DEEP_STUBS);
@@ -248,9 +262,9 @@ public class RepeatedFinishClientTransactionTest extends
BaseIgniteAbstractTest
PartitionMapping pm = new PartitionMapping(1, "test", 1);
- ClientTransaction tx = new ClientTransaction(clientChannel, 1, false,
randomUUID(), pm, randomUUID(), EMPTY_TS_PROVIDER, 0);
+ ClientTransaction tx = new ClientTransaction(clientChannel, ch, 1,
false, randomUUID(), pm, randomUUID(), EMPTY_TS_PROVIDER, 0);
- tx.rollback();
+ rollbackClo.accept(tx);
WriteContext wc = new WriteContext(emptyTracker(),
ClientOp.TUPLE_UPSERT);
wc.pm = pm;
diff --git
a/modules/core/src/main/java/org/apache/ignite/internal/util/CollectionUtils.java
b/modules/core/src/main/java/org/apache/ignite/internal/util/CollectionUtils.java
index 2e73be1e17a..04d7af04902 100644
---
a/modules/core/src/main/java/org/apache/ignite/internal/util/CollectionUtils.java
+++
b/modules/core/src/main/java/org/apache/ignite/internal/util/CollectionUtils.java
@@ -96,6 +96,28 @@ public final class CollectionUtils {
return iter == null || !iter.hasNext();
}
+ /**
+ * Count values in iterator.
+ *
+ * @param iter The iterator.
+ *
+ * @return The count.
+ */
+ public static int count(@Nullable Iterator<?> iter) {
+ if (iter == null) {
+ return 0;
+ }
+
+ int cnt = 0;
+
+ while (iter.hasNext()) {
+ iter.next();
+ cnt++;
+ }
+
+ return cnt;
+ }
+
/**
* Gets first element from given list or returns {@code null} if list is
empty.
*
diff --git a/modules/platforms/cpp/ignite/common/error_codes.h
b/modules/platforms/cpp/ignite/common/error_codes.h
index 23f91076ba3..4bca391ce42 100644
--- a/modules/platforms/cpp/ignite/common/error_codes.h
+++ b/modules/platforms/cpp/ignite/common/error_codes.h
@@ -139,6 +139,7 @@ enum class code : underlying_t {
TX_STALE_READ_ONLY_OPERATION = 0x7000f,
TX_ALREADY_FINISHED_WITH_TIMEOUT = 0x70010,
TX_DELAYED_ACK = 0x70011,
+ TX_KILLED = 0x70012,
// Replicator group. Group code: 8
REPLICA_COMMON = 0x80001,
diff --git a/modules/platforms/cpp/ignite/odbc/common_types.cpp
b/modules/platforms/cpp/ignite/odbc/common_types.cpp
index 5e6581e7378..fd177463f2f 100644
--- a/modules/platforms/cpp/ignite/odbc/common_types.cpp
+++ b/modules/platforms/cpp/ignite/odbc/common_types.cpp
@@ -210,6 +210,7 @@ sql_state error_code_to_sql_state(error::code code) {
case error::code::TX_STALE_READ_ONLY_OPERATION:
case error::code::TX_ALREADY_FINISHED_WITH_TIMEOUT:
case error::code::TX_DELAYED_ACK:
+ case error::code::TX_KILLED:
return sql_state::S25000_INVALID_TRANSACTION_STATE;
// Replicator group. Group code: 8
diff --git a/modules/platforms/dotnet/Apache.Ignite/ErrorCodes.g.cs
b/modules/platforms/dotnet/Apache.Ignite/ErrorCodes.g.cs
index 80ebf868d8d..06abe213e46 100644
--- a/modules/platforms/dotnet/Apache.Ignite/ErrorCodes.g.cs
+++ b/modules/platforms/dotnet/Apache.Ignite/ErrorCodes.g.cs
@@ -370,6 +370,9 @@ namespace Apache.Ignite
/// <summary> TxDelayedAck error. </summary>
public const int TxDelayedAck = (GroupCode << 16) | (17 & 0xFFFF);
+
+ /// <summary> TxKilled error. </summary>
+ public const int TxKilled = (GroupCode << 16) | (18 & 0xFFFF);
}
/// <summary> Replicator errors. </summary>
diff --git
a/modules/runner/src/integrationTest/java/org/apache/ignite/internal/runner/app/client/ItAbstractThinClientTest.java
b/modules/runner/src/integrationTest/java/org/apache/ignite/internal/runner/app/client/ItAbstractThinClientTest.java
index 47fa23e374f..d2517738151 100644
---
a/modules/runner/src/integrationTest/java/org/apache/ignite/internal/runner/app/client/ItAbstractThinClientTest.java
+++
b/modules/runner/src/integrationTest/java/org/apache/ignite/internal/runner/app/client/ItAbstractThinClientTest.java
@@ -34,6 +34,7 @@ import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.CompletableFuture;
+import java.util.stream.IntStream;
import org.apache.ignite.Ignite;
import org.apache.ignite.IgniteServer;
import org.apache.ignite.InitParameters;
@@ -84,6 +85,17 @@ public abstract class ItAbstractThinClientTest extends
BaseIgniteAbstractTest {
private IgniteClient client;
private List<IgniteServer> nodes;
+ /**
+ * Enables stack traces for a node.
+ *
+ * @param nodeIdx Node index to test.
+ *
+ * @return The flag value.
+ */
+ protected boolean enableTracesPredicate(int nodeIdx) {
+ return nodeIdx == 1;
+ }
+
/**
* Before all.
*/
@@ -97,7 +109,7 @@ public abstract class ItAbstractThinClientTest extends
BaseIgniteAbstractTest {
"ignite {\n"
+ " network.port: " + (3344 + i) + ",\n"
+ " network.nodeFinder.netClusterNodes: [
\"localhost:3344\" ]\n"
- + (i == 1 ? ("
clientConnector.sendServerExceptionStackTraceToClient: true\n"
+ + (enableTracesPredicate(i) ? ("
clientConnector.sendServerExceptionStackTraceToClient: true\n"
+ " clientConnector.metricsEnabled: true\n") : "")
+ " clientConnector.port: " + (10800 + i) + ",\n"
+ " rest.port: " + (10300 + i) + ",\n"
@@ -211,6 +223,10 @@ public abstract class ItAbstractThinClientTest extends
BaseIgniteAbstractTest {
return startedNodes.get(idx);
}
+ protected Ignite server(ClusterNode node) {
+ return IntStream.range(0, nodes()).mapToObj(this::server).filter(n ->
n.name().equals(node.name())).findFirst().orElseThrow();
+ }
+
protected ClusterNode node(int idx) {
return sortedNodes().get(idx);
}
diff --git
a/modules/runner/src/integrationTest/java/org/apache/ignite/internal/runner/app/client/ItThinClientTransactionsTest.java
b/modules/runner/src/integrationTest/java/org/apache/ignite/internal/runner/app/client/ItThinClientTransactionsTest.java
index 4445fe3444a..e8fa68cacb2 100644
---
a/modules/runner/src/integrationTest/java/org/apache/ignite/internal/runner/app/client/ItThinClientTransactionsTest.java
+++
b/modules/runner/src/integrationTest/java/org/apache/ignite/internal/runner/app/client/ItThinClientTransactionsTest.java
@@ -17,11 +17,13 @@
package org.apache.ignite.internal.runner.app.client;
+import static java.lang.String.format;
import static java.util.Collections.emptyList;
import static java.util.Comparator.comparing;
import static org.apache.ignite.internal.TestWrappers.unwrapIgniteImpl;
import static
org.apache.ignite.internal.testframework.matchers.CompletableFutureExceptionMatcher.willThrowWithCauseOrSuppressed;
import static
org.apache.ignite.internal.testframework.matchers.CompletableFutureMatcher.willSucceedFast;
+import static org.awaitility.Awaitility.await;
import static org.hamcrest.MatcherAssert.assertThat;
import static org.hamcrest.Matchers.containsString;
import static org.hamcrest.Matchers.startsWith;
@@ -32,13 +34,16 @@ import static
org.junit.jupiter.api.Assertions.assertNotNull;
import static org.junit.jupiter.api.Assertions.assertNull;
import static org.junit.jupiter.api.Assertions.assertThrows;
import static org.junit.jupiter.api.Assertions.assertTrue;
+import static org.junit.jupiter.params.provider.Arguments.argumentSet;
import java.lang.reflect.Field;
import java.util.ArrayList;
+import java.util.Arrays;
import java.util.Collection;
import java.util.Collections;
import java.util.HashMap;
import java.util.HashSet;
+import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.Map.Entry;
@@ -50,22 +55,29 @@ import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
import java.util.concurrent.atomic.AtomicReference;
import java.util.stream.Collectors;
+import java.util.stream.Stream;
import org.apache.ignite.Ignite;
import org.apache.ignite.client.IgniteClient;
import org.apache.ignite.internal.app.IgniteImpl;
import org.apache.ignite.internal.client.ClientChannel;
import org.apache.ignite.internal.client.ClientTransactionInflights;
import org.apache.ignite.internal.client.TcpIgniteClient;
+import org.apache.ignite.internal.client.sql.ClientSql;
+import org.apache.ignite.internal.client.sql.PartitionMappingProvider;
import org.apache.ignite.internal.client.table.ClientTable;
import org.apache.ignite.internal.client.tx.ClientLazyTransaction;
import org.apache.ignite.internal.client.tx.ClientTransaction;
+import org.apache.ignite.internal.lang.IgniteTriFunction;
import org.apache.ignite.internal.testframework.IgniteTestUtils;
+import org.apache.ignite.internal.tx.Lock;
import org.apache.ignite.internal.tx.TxState;
+import org.apache.ignite.internal.util.CollectionUtils;
import org.apache.ignite.lang.ErrorGroups;
import org.apache.ignite.lang.ErrorGroups.Transactions;
import org.apache.ignite.lang.IgniteException;
import org.apache.ignite.network.ClusterNode;
import org.apache.ignite.sql.ResultSet;
+import org.apache.ignite.sql.SqlException;
import org.apache.ignite.sql.SqlRow;
import org.apache.ignite.table.KeyValueView;
import org.apache.ignite.table.RecordView;
@@ -78,8 +90,11 @@ import org.apache.ignite.tx.Transaction;
import org.apache.ignite.tx.TransactionException;
import org.apache.ignite.tx.TransactionOptions;
import org.junit.jupiter.api.AfterEach;
+import org.junit.jupiter.api.Disabled;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.params.ParameterizedTest;
+import org.junit.jupiter.params.provider.Arguments;
+import org.junit.jupiter.params.provider.MethodSource;
import org.junit.jupiter.params.provider.ValueSource;
import org.mockito.Mockito;
@@ -393,13 +408,15 @@ public class ItThinClientTransactionsTest extends
ItAbstractThinClientTest {
tx.rollback();
}
- @Test
- void testKillTransaction() {
+ @ParameterizedTest
+ @MethodSource("killTestContextFactory")
+ void testKillTransaction(KillTestContext ctx) {
@SuppressWarnings("resource") IgniteClient client = client();
- KeyValueView<Integer, String> kvView = kvView();
+ KeyValueView<Tuple, Tuple> kvView = table().keyValueView();
Transaction tx = client.transactions().begin();
- kvView.put(tx, 1, "1");
+ Tuple key = key(0);
+ assertThat(ctx.put.apply(client, tx, key), willSucceedFast());
try (ResultSet<SqlRow> cursor = client.sql().execute("SELECT
TRANSACTION_ID FROM SYSTEM.TRANSACTIONS")) {
cursor.forEachRemaining(r -> {
@@ -411,7 +428,152 @@ public class ItThinClientTransactionsTest extends
ItAbstractThinClientTest {
TransactionException ex = assertThrows(TransactionException.class,
tx::commit);
assertThat(ex.getMessage(), startsWith("Transaction is killed"));
- assertEquals("IGN-TX-13", ex.codeAsString());
+ assertEquals("IGN-TX-18", ex.codeAsString());
+
+ assertThat(kvView.removeAsync(key), willSucceedFast());
+ }
+
+ @ParameterizedTest
+ @MethodSource("killTestContextFactory")
+ void testEnlistmentAfterKillTransaction(KillTestContext ctx) {
+ @SuppressWarnings("resource") IgniteClient client = client();
+ KeyValueView<Tuple, Tuple> kvView = table().keyValueView();
+
+ Map<Partition, ClusterNode> map =
table().partitionDistribution().primaryReplicasAsync().join();
+ IgniteImpl server0 = unwrapIgniteImpl(server(0));
+ List<Tuple> tuples = generateKeysForNode(100, 10, map,
server0.cluster().localNode(), table());
+
+ Transaction tx = client.transactions().begin();
+ Tuple key = tuples.get(0);
+ assertThat(ctx.put.apply(client, tx, key), willSucceedFast());
+
+ try (ResultSet<SqlRow> cursor = client.sql().execute("SELECT
TRANSACTION_ID FROM SYSTEM.TRANSACTIONS")) {
+ cursor.forEachRemaining(r -> {
+ String txId = r.stringValue("TRANSACTION_ID");
+ client.sql().executeScript("KILL TRANSACTION '" + txId + "'");
+ });
+ }
+
+ Tuple key2 = tuples.get(1);
+ assertThat(ctx.put.apply(client, tx, key2),
willThrowWithCauseOrSuppressed(TransactionException.class, "Transaction is
killed"));
+
+ assertThat(tx.commitAsync(), willSucceedFast());
+
+ // Validate lock possibility.
+ assertThat(kvView.removeAllAsync(null, Arrays.asList(key, key2)),
willSucceedFast());
+ }
+
+ @ParameterizedTest
+ @MethodSource("killTestContextFactory")
+ void testKillTransactionAfterSecondEnlistment(KillTestContext ctx) {
+ @SuppressWarnings("resource") IgniteClient client = client();
+ KeyValueView<Tuple, Tuple> kvView = table().keyValueView();
+
+ Map<Partition, ClusterNode> map =
table().partitionDistribution().primaryReplicasAsync().join();
+ IgniteImpl server0 = unwrapIgniteImpl(server(0));
+ List<Tuple> tuples = generateKeysForNode(100, 10, map,
server0.cluster().localNode(), table());
+
+ Transaction tx = client.transactions().begin();
+ Tuple key = tuples.get(0);
+ assertThat(ctx.put.apply(client, tx, key), willSucceedFast());
+
+ Tuple key2 = tuples.get(1);
+ assertThat(ctx.put.apply(client, tx, key2), willSucceedFast());
+
+ try (ResultSet<SqlRow> cursor = client.sql().execute("SELECT
TRANSACTION_ID FROM SYSTEM.TRANSACTIONS")) {
+ cursor.forEachRemaining(r -> {
+ String txId = r.stringValue("TRANSACTION_ID");
+ client.sql().executeScript("KILL TRANSACTION '" + txId + "'");
+ });
+ }
+
+ assertThat(tx.commitAsync(),
willThrowWithCauseOrSuppressed(TransactionException.class, "Transaction is
killed"));
+
+ // Validate lock possibility.
+ assertThat(kvView.removeAllAsync(null, Arrays.asList(key, key2)),
willSucceedFast());
+ }
+
+ @ParameterizedTest
+ @MethodSource("killTestContextFactory")
+ void testDirectEnlistmentAfterKillTransaction(KillTestContext ctx) {
+ @SuppressWarnings("resource") IgniteClient client = client();
+ ClientSql sql = (ClientSql) client.sql();
+ KeyValueView<Tuple, Tuple> kvView = table().keyValueView();
+
+ Map<Partition, ClusterNode> map =
table().partitionDistribution().primaryReplicasAsync().join();
+ IgniteImpl server0 = unwrapIgniteImpl(server(0));
+ IgniteImpl server1 = unwrapIgniteImpl(server(1));
+ List<Tuple> tuples = generateKeysForNode(100, 10, map,
server0.cluster().localNode(), table());
+ List<Tuple> tuples2 = generateKeysForNode(100, 10, map,
server1.cluster().localNode(), table());
+
+ // Init mappings.
+ Tuple key0 = tuples.get(0);
+ sql.execute(format("INSERT INTO %s (%s, %s) VALUES (?, ?)",
TABLE_NAME, COLUMN_KEY, COLUMN_VAL),
+ key0.intValue(0), key0.intValue(0) + "");
+ await().atMost(2, TimeUnit.SECONDS)
+ .until(() ->
sql.partitionAwarenessCachedMetas().stream().allMatch(PartitionMappingProvider::ready));
+
+ Transaction tx = client.transactions().begin();
+ Tuple key1 = tuples.get(1);
+ assertThat(ctx.put.apply(client, tx, key1), willSucceedFast());
+
+ try (ResultSet<SqlRow> cursor = client.sql().execute("SELECT
TRANSACTION_ID FROM SYSTEM.TRANSACTIONS")) {
+ cursor.forEachRemaining(r -> {
+ String txId = r.stringValue("TRANSACTION_ID");
+ client.sql().executeScript("KILL TRANSACTION '" + txId + "'");
+ });
+ }
+
+ // No direct enlistments exists at this point so put will succeed.
This can be improved.
+ Tuple key2 = tuples2.get(0);
+ assertThat(ctx.put.apply(client, tx, key2), willSucceedFast());
+
+ assertThat(tx.commitAsync(),
willThrowWithCauseOrSuppressed(TransactionException.class, "Transaction is
killed"));
+
+ // Validate lock possibility.
+ assertThat(kvView.removeAllAsync(null, Arrays.asList(key0, key1,
key2)), willSucceedFast());
+ }
+
+ @ParameterizedTest
+ @MethodSource("killTestContextFactory")
+ void testKillSqlTransactionAfterSecondDirectEnlistment(KillTestContext
ctx) {
+ @SuppressWarnings("resource") IgniteClient client = client();
+ ClientSql sql = (ClientSql) client.sql();
+ KeyValueView<Tuple, Tuple> kvView = table().keyValueView();
+
+ Map<Partition, ClusterNode> map =
table().partitionDistribution().primaryReplicasAsync().join();
+ IgniteImpl server0 = unwrapIgniteImpl(server(0));
+ IgniteImpl server1 = unwrapIgniteImpl(server(1));
+ List<Tuple> tuples = generateKeysForNode(100, 10, map,
server0.cluster().localNode(), table());
+ List<Tuple> tuples2 = generateKeysForNode(100, 10, map,
server1.cluster().localNode(), table());
+
+ // Init mappings.
+ Tuple key0 = tuples.get(0);
+ sql.execute(format("INSERT INTO %s (%s, %s) VALUES (?, ?)",
TABLE_NAME, COLUMN_KEY, COLUMN_VAL),
+ key0.intValue(0), key0.intValue(0) + "");
+ await().atMost(2, TimeUnit.SECONDS)
+ .until(() ->
sql.partitionAwarenessCachedMetas().stream().allMatch(PartitionMappingProvider::ready));
+
+ ClientLazyTransaction tx = (ClientLazyTransaction)
client.transactions().begin();
+ Tuple key1 = tuples.get(1);
+ assertThat(ctx.put.apply(client, tx, key1), willSucceedFast());
+
+ Tuple key2 = tuples2.get(0);
+ assertThat(ctx.put.apply(client, tx, key2), willSucceedFast());
+
+ try (ResultSet<SqlRow> cursor = client.sql().execute("SELECT
TRANSACTION_ID FROM SYSTEM.TRANSACTIONS")) {
+ cursor.forEachRemaining(r -> {
+ String txId = r.stringValue("TRANSACTION_ID");
+ client.sql().executeScript("KILL TRANSACTION '" + txId + "'");
+ });
+ }
+
+ await().atMost(3, TimeUnit.SECONDS).until(() ->
tx.startedTx().killed());
+
+ assertThat(tx.commitAsync(), willSucceedFast());
+
+ // Validate lock possibility.
+ assertThat(kvView.removeAllAsync(null, Arrays.asList(key0, key1,
key2)), willSucceedFast());
}
@Test
@@ -425,7 +587,7 @@ public class ItThinClientTransactionsTest extends
ItAbstractThinClientTest {
assertEquals(PARTITIONS, map.size());
- int k = 111; // Avoid intersection with previous tests.
+ int k = 111111; // Avoid intersection with previous tests.
Map<Tuple, Tuple> txMap = new HashMap<>();
@@ -835,7 +997,7 @@ public class ItThinClientTransactionsTest extends
ItAbstractThinClientTest {
// Expecting each write operation to trigger add/remove events.
int exp = 20 + partitions;
- Mockito.verify(spyed,
Mockito.times(exp)).addInflight(tx0.startedTx().txId());
+ Mockito.verify(spyed, Mockito.times(exp)).addInflight(tx0.startedTx());
Mockito.verify(spyed,
Mockito.times(exp)).removeInflight(Mockito.eq(tx0.startedTx().txId()),
Mockito.any());
// Check if all locks are released.
@@ -1183,38 +1345,154 @@ public class ItThinClientTransactionsTest extends
ItAbstractThinClientTest {
assertNull(val, "Read-only transaction should not see values committed
after its start");
}
- @Test
- public void testRollbackDoesNotBlockOnLockConflict() {
+ @ParameterizedTest
+ @MethodSource("killTestContextFactory")
+ public void testRollbackDoesNotBlockOnLockConflict(KillTestContext ctx) {
ClientTable table = (ClientTable) table();
+ ClientSql sql = (ClientSql) client().sql();
KeyValueView<Tuple, Tuple> kvView = table().keyValueView();
Map<Partition, ClusterNode> map =
table.partitionDistribution().primaryReplicasAsync().join();
- List<Tuple> tuples0 = generateKeysForPartition(800, 10, map, 0, table);
+ Entry<Partition, ClusterNode> mapping =
map.entrySet().iterator().next();
+ List<Tuple> tuples0 = generateKeysForPartition(100, 10, map, (int)
mapping.getKey().id(), table);
+ Ignite server = server(mapping.getValue());
+
+ // Init SQL mappings.
+ Tuple key0 = tuples0.get(0);
+ sql.execute(format("INSERT INTO %s (%s, %s) VALUES (?, ?)",
TABLE_NAME, COLUMN_KEY, COLUMN_VAL),
+ key0.intValue(0), key0.intValue(0) + "");
+ await().atMost(2, TimeUnit.SECONDS)
+ .until(() ->
sql.partitionAwarenessCachedMetas().stream().allMatch(PartitionMappingProvider::ready));
ClientLazyTransaction olderTxProxy = (ClientLazyTransaction)
client().transactions().begin();
ClientLazyTransaction youngerTxProxy = (ClientLazyTransaction)
client().transactions().begin();
- Tuple key = tuples0.get(0);
- Tuple key2 = tuples0.get(1);
- Tuple val = val("1");
- Tuple val2 = val("2");
+ Tuple key = tuples0.get(1);
+ Tuple key2 = tuples0.get(2);
- kvView.put(olderTxProxy, key, val);
+ assertThat(ctx.put.apply(client(), olderTxProxy, key),
willSucceedFast());
ClientTransaction olderTx = olderTxProxy.startedTx();
- kvView.put(youngerTxProxy, key2, val2);
+ assertThat(ctx.put.apply(client(), youngerTxProxy, key2),
willSucceedFast());
ClientTransaction youngerTx = youngerTxProxy.startedTx();
assertTrue(olderTx.txId().compareTo(youngerTx.txId()) < 0);
// Older is allowed to wait with wait-die.
- CompletableFuture<Void> fut = kvView.putAsync(olderTxProxy, key2, val);
+ CompletableFuture<?> fut = ctx.put.apply(client(), olderTxProxy, key2);
assertFalse(fut.isDone());
+ IgniteImpl ignite = unwrapIgniteImpl(server);
+
+ await().atMost(2, TimeUnit.SECONDS).until(() -> {
+ Iterator<Lock> locks =
ignite.txManager().lockManager().locks(olderTx.txId());
+
+ return CollectionUtils.count(locks) == 2;
+ });
+
assertThat(olderTxProxy.rollbackAsync(), willSucceedFast());
// Operation future should be failed.
- assertThat(fut,
willThrowWithCauseOrSuppressed(TransactionException.class));
+ assertThat(fut, willThrowWithCauseOrSuppressed(ctx.expectedErr));
+
+ // Ensure inflights cleanup.
+ assertThat(youngerTxProxy.rollbackAsync(), willSucceedFast());
+
+ assertThat(kvView.removeAllAsync(null, Arrays.asList(key0, key,
key2)), willSucceedFast());
+ }
+
+ @Test
+ @Disabled("https://issues.apache.org/jira/browse/IGNITE-27947")
+ public void testRollbackDoesNotBlockOnLockConflictDuringFirstRequest()
throws InterruptedException {
+ // Note: reversed tx priority is required for this test.
+ ClientTable table = (ClientTable) table();
+ KeyValueView<Tuple, Tuple> kvView = table().keyValueView();
+
+ Map<Partition, ClusterNode> map =
table.partitionDistribution().primaryReplicasAsync().join();
+ List<Tuple> tuples0 = generateKeysForPartition(100, 10, map, 0, table);
+
+ // We need a waiter for this scenario.
+ Tuple key = tuples0.get(0);
+ Tuple val = val("1");
+
+ ClientLazyTransaction tx1 = (ClientLazyTransaction)
client().transactions().begin();
+ ClientLazyTransaction tx2 = (ClientLazyTransaction)
client().transactions().begin();
+
+ kvView.put(tx1, key, val);
+
+ // Will wait for lock.
+ CompletableFuture<Void> fut2 = kvView.putAsync(tx2, key, val);
+ assertFalse(fut2.isDone());
+
+ Thread.sleep(500);
+
+ // Rollback should not be blocked.
+ assertThat(tx2.rollbackAsync(), willSucceedFast());
+ assertThat(tx1.rollbackAsync(), willSucceedFast());
+ }
+
+ @ParameterizedTest
+ @MethodSource("killTestContextFactory")
+ public void
testRollbackDoesNotBlockOnLockConflictWithDirectMapping(KillTestContext ctx) {
+ ClientTable table = (ClientTable) table();
+ ClientSql sql = (ClientSql) client().sql();
+ KeyValueView<Tuple, Tuple> kvView = table().keyValueView();
+
+ Map<Partition, ClusterNode> map =
table.partitionDistribution().primaryReplicasAsync().join();
+ Entry<Partition, ClusterNode> p0 = null;
+ Entry<Partition, ClusterNode> p1 = null;
+ for (Entry<Partition, ClusterNode> entry : map.entrySet()) {
+ if (p0 == null) {
+ p0 = entry;
+ } else if (!p0.getValue().equals(entry.getValue())) {
+ p1 = entry;
+ break;
+ }
+ }
+
+ // Expecting at least one partition on different node.
+ assertNotNull(p0);
+ assertNotNull(p1);
+
+ List<Tuple> tuples0 = generateKeysForPartition(100, 10, map, (int)
p0.getKey().id(), table);
+ List<Tuple> tuples1 = generateKeysForPartition(100, 10, map, (int)
p1.getKey().id(), table);
+
+ // Init SQL mappings.
+ Tuple key0 = tuples0.get(0);
+ sql.execute(format("INSERT INTO %s (%s, %s) VALUES (?, ?)",
TABLE_NAME, COLUMN_KEY, COLUMN_VAL),
+ key0.intValue(0), key0.intValue(0) + "");
+ await().atMost(2, TimeUnit.SECONDS)
+ .until(() ->
sql.partitionAwarenessCachedMetas().stream().allMatch(PartitionMappingProvider::ready));
+
+ ClientLazyTransaction olderTxProxy = (ClientLazyTransaction)
client().transactions().begin();
+ ClientLazyTransaction youngerTxProxy = (ClientLazyTransaction)
client().transactions().begin();
+
+ Tuple key = tuples0.get(1);
+ Tuple key2 = tuples0.get(2);
+ Tuple key3 = tuples1.get(0);
+ Tuple key4 = tuples1.get(1);
+
+ assertThat(ctx.put.apply(client(), olderTxProxy, key),
willSucceedFast());
+
+ ClientTransaction olderTx = olderTxProxy.startedTx();
+
+ assertThat(ctx.put.apply(client(), youngerTxProxy, key2),
willSucceedFast());
+
+ ClientTransaction youngerTx = youngerTxProxy.startedTx();
+
+ assertTrue(olderTx.txId().compareTo(youngerTx.txId()) < 0);
+
+ // Should be directly mapped
+ assertThat(ctx.put.apply(client(), youngerTxProxy, key3),
willSucceedFast());
+
+ // Younger is not allowed to wait with wait-die.
+ // Next operation should invalidate the transaction.
+ assertThat(ctx.put.apply(client(), youngerTxProxy, key),
willThrowWithCauseOrSuppressed(ctx.expectedErr));
+
+ olderTxProxy.commit();
+
+ // Ensure all enlisted keys are unlocked.
+ assertThat(kvView.removeAllAsync(null, Arrays.asList(key0, key, key2,
key3, key4)), willSucceedFast());
}
@AfterEach
@@ -1277,4 +1555,32 @@ public class ItThinClientTransactionsTest extends
ItAbstractThinClientTest {
return Objects.hash(key, val);
}
}
+
+ private static Stream<Arguments> killTestContextFactory() {
+ return Stream.of(
+ argumentSet("kv", new
KillTestContext(TransactionException.class,
ItThinClientTransactionsTest::putKv)),
+ argumentSet("sql", new KillTestContext(SqlException.class,
ItThinClientTransactionsTest::putSql))
+ );
+ }
+
+ private static CompletableFuture<?> putSql(IgniteClient client,
Transaction tx, Tuple key) {
+ return client.sql()
+ .executeAsync(tx, format("INSERT INTO %s (%s, %s) VALUES (?,
?)", TABLE_NAME, COLUMN_KEY, COLUMN_VAL), key.intValue(0),
+ key.intValue(0) + "");
+ }
+
+ private static CompletableFuture<?> putKv(IgniteClient client, Transaction
tx, Tuple key) {
+ return client.tables().tables().get(0).keyValueView().putAsync(tx,
key, val(key.intValue(0) + ""));
+ }
+
+ private static class KillTestContext {
+ final Class<? extends Exception> expectedErr;
+ final IgniteTriFunction<IgniteClient, Transaction, Tuple,
CompletableFuture<?>> put;
+
+ public KillTestContext(Class<? extends Exception> expectedErr,
+ IgniteTriFunction<IgniteClient, Transaction, Tuple,
CompletableFuture<?>> put) {
+ this.expectedErr = expectedErr;
+ this.put = put;
+ }
+ }
}
diff --git
a/modules/table/src/test/java/org/apache/ignite/internal/table/distributed/storage/InternalTableImplTest.java
b/modules/table/src/test/java/org/apache/ignite/internal/table/distributed/storage/InternalTableImplTest.java
index aaf913d0dbd..14c5a4837f4 100644
---
a/modules/table/src/test/java/org/apache/ignite/internal/table/distributed/storage/InternalTableImplTest.java
+++
b/modules/table/src/test/java/org/apache/ignite/internal/table/distributed/storage/InternalTableImplTest.java
@@ -187,8 +187,8 @@ public class InternalTableImplTest extends
BaseIgniteAbstractTest {
TestTransactionIds.newTransactionId(),
randomUUID(),
true, // implicit
- 10_000
- );
+ 10_000,
+ null);
});
lenient().when(replicaService.invoke(anyString(),
any())).then(invocation -> {
@@ -411,8 +411,8 @@ public class InternalTableImplTest extends
BaseIgniteAbstractTest {
TestTransactionIds.newTransactionId(),
randomUUID(),
false,
- 10_000
- );
+ 10_000,
+ null);
}
private InternalTransaction newReadOnlyTransaction() {
diff --git
a/modules/transactions/src/integrationTest/java/org/apache/ignite/internal/tx/KillTransactionTest.java
b/modules/transactions/src/integrationTest/java/org/apache/ignite/internal/tx/ItKillTransactionTest.java
similarity index 98%
rename from
modules/transactions/src/integrationTest/java/org/apache/ignite/internal/tx/KillTransactionTest.java
rename to
modules/transactions/src/integrationTest/java/org/apache/ignite/internal/tx/ItKillTransactionTest.java
index b4cb483cf6a..35b7609b677 100644
---
a/modules/transactions/src/integrationTest/java/org/apache/ignite/internal/tx/KillTransactionTest.java
+++
b/modules/transactions/src/integrationTest/java/org/apache/ignite/internal/tx/ItKillTransactionTest.java
@@ -47,7 +47,7 @@ import org.junit.jupiter.api.Test;
* The behavior of this API is similar to the rollback invocation,
* but a client has to get a specific exception when trying to interact with
the transaction object.
*/
-public class KillTransactionTest extends ClusterPerClassIntegrationTest {
+public class ItKillTransactionTest extends ClusterPerClassIntegrationTest {
@Override
protected void configureInitParameters(InitParametersBuilder builder) {
super.configureInitParameters(builder);
diff --git
a/modules/transactions/src/main/java/org/apache/ignite/internal/tx/InternalTransaction.java
b/modules/transactions/src/main/java/org/apache/ignite/internal/tx/InternalTransaction.java
index 0e3b42aa1b3..e1605695da7 100644
---
a/modules/transactions/src/main/java/org/apache/ignite/internal/tx/InternalTransaction.java
+++
b/modules/transactions/src/main/java/org/apache/ignite/internal/tx/InternalTransaction.java
@@ -121,6 +121,10 @@ public interface InternalTransaction extends Transaction {
return false;
}
+ default boolean remoteOnCoordinator() {
+ return false;
+ }
+
/**
* Finishes a read-only transaction with a specific execution timestamp.
*
diff --git
a/modules/transactions/src/main/java/org/apache/ignite/internal/tx/InternalTxOptions.java
b/modules/transactions/src/main/java/org/apache/ignite/internal/tx/InternalTxOptions.java
index cea9ef47697..fa952c8ac90 100644
---
a/modules/transactions/src/main/java/org/apache/ignite/internal/tx/InternalTxOptions.java
+++
b/modules/transactions/src/main/java/org/apache/ignite/internal/tx/InternalTxOptions.java
@@ -17,6 +17,7 @@
package org.apache.ignite.internal.tx;
+import java.util.function.Consumer;
import org.apache.ignite.internal.hlc.HybridTimestamp;
import
org.apache.ignite.internal.tx.configuration.TransactionConfigurationSchema;
import org.jetbrains.annotations.Nullable;
@@ -47,11 +48,16 @@ public class InternalTxOptions {
@Nullable
private final HybridTimestamp readTimestamp;
- private InternalTxOptions(TxPriority priority, long timeoutMillis,
@Nullable HybridTimestamp readTimestamp, @Nullable String txLabel) {
+ /** Transaction kill closure. Defines context specific action on tx kill.
*/
+ private final @Nullable Consumer<InternalTransaction> killClosure;
+
+ private InternalTxOptions(TxPriority priority, long timeoutMillis,
@Nullable HybridTimestamp readTimestamp, @Nullable String txLabel,
+ @Nullable Consumer<InternalTransaction> killClosure) {
this.priority = priority;
this.timeoutMillis = timeoutMillis;
this.readTimestamp = readTimestamp;
this.txLabel = txLabel;
+ this.killClosure = killClosure;
}
public static Builder builder() {
@@ -82,6 +88,10 @@ public class InternalTxOptions {
return txLabel;
}
+ public @Nullable Consumer<InternalTransaction> killClosure() {
+ return killClosure;
+ }
+
/** Builder for InternalTxOptions. */
public static class Builder {
private TxPriority priority = TxPriority.NORMAL;
@@ -98,6 +108,8 @@ public class InternalTxOptions {
@Nullable
private String txLabel = null;
+ private Consumer<InternalTransaction> killClosure;
+
public Builder priority(TxPriority priority) {
this.priority = priority;
return this;
@@ -118,8 +130,13 @@ public class InternalTxOptions {
return this;
}
+ public Builder killClosure(Consumer<InternalTransaction> r) {
+ this.killClosure = r;
+ return this;
+ }
+
public InternalTxOptions build() {
- return new InternalTxOptions(priority, timeoutMillis,
readTimestamp, txLabel);
+ return new InternalTxOptions(priority, timeoutMillis,
readTimestamp, txLabel, killClosure);
}
}
}
diff --git
a/modules/transactions/src/main/java/org/apache/ignite/internal/tx/TransactionExceptionMapperProvider.java
b/modules/transactions/src/main/java/org/apache/ignite/internal/tx/TransactionExceptionMapperProvider.java
index da5311e8da7..f0fc334d85e 100644
---
a/modules/transactions/src/main/java/org/apache/ignite/internal/tx/TransactionExceptionMapperProvider.java
+++
b/modules/transactions/src/main/java/org/apache/ignite/internal/tx/TransactionExceptionMapperProvider.java
@@ -48,6 +48,8 @@ public class TransactionExceptionMapperProvider implements
IgniteExceptionMapper
err -> new IncompatibleSchemaException(err.traceId(),
err.code(), err.getMessage(), err)));
mappers.add(unchecked(DelayedAckException.class,
err -> new TransactionException(err.traceId(), err.code(),
err.getMessage(), err.getCause())));
+ mappers.add(unchecked(TransactionKilledException.class,
+ err -> new TransactionException(err.traceId(), err.code(),
err.getMessage(), err)));
return mappers;
}
diff --git
a/modules/client/src/main/java/org/apache/ignite/internal/client/ClientDelayedAckException.java
b/modules/transactions/src/main/java/org/apache/ignite/internal/tx/TransactionKilledException.java
similarity index 55%
copy from
modules/client/src/main/java/org/apache/ignite/internal/client/ClientDelayedAckException.java
copy to
modules/transactions/src/main/java/org/apache/ignite/internal/tx/TransactionKilledException.java
index ef9e06513b5..0aa02f24c7e 100644
---
a/modules/client/src/main/java/org/apache/ignite/internal/client/ClientDelayedAckException.java
+++
b/modules/transactions/src/main/java/org/apache/ignite/internal/tx/TransactionKilledException.java
@@ -15,42 +15,40 @@
* limitations under the License.
*/
-package org.apache.ignite.internal.client;
+package org.apache.ignite.internal.tx;
+
+import static org.apache.ignite.internal.tx.TransactionLogUtils.formatTxInfo;
+import static org.apache.ignite.lang.ErrorGroups.Transactions.TX_KILLED_ERR;
import java.util.UUID;
-import org.apache.ignite.internal.lang.IgniteInternalException;
-import org.jetbrains.annotations.Nullable;
+import org.apache.ignite.tx.RetriableTransactionException;
/**
- * Holds the transaction id and the cause for delayed replication ack failure.
+ * Reports killed transaction.
*/
-public class ClientDelayedAckException extends IgniteInternalException {
- /** Serial version uid. */
+public class TransactionKilledException extends TransactionInternalException
implements RetriableTransactionException {
private static final long serialVersionUID = 0L;
- /** Transaction id. */
private final UUID txId;
/**
- * Constructor.
- *
- * @param traceId Trace ID.
- * @param code Error code.
+ * Creates the exception with txId and optional txManager for label
formatting.
*
- * @param message String message.
- * @param txId Related transaction id.
- * @param cause Cause.
+ * @param txId The transaction id.
+ * @param txManager transaction manager to retrieve label for logging.
*/
- ClientDelayedAckException(UUID traceId, int code, @Nullable String
message, UUID txId, @Nullable Throwable cause) {
- super(traceId, code, message, cause);
-
+ public TransactionKilledException(UUID txId, TxManager txManager) {
+ super(
+ TX_KILLED_ERR,
+ "Transaction is killed " + formatTxInfo(txId, txManager)
+ );
this.txId = txId;
}
/**
- * Gets expected schema version.
+ * Returns a transaction id.
*
- * @return Expected schema version.
+ * @return The id.
*/
public UUID txId() {
return txId;
diff --git
a/modules/transactions/src/main/java/org/apache/ignite/internal/tx/TxManager.java
b/modules/transactions/src/main/java/org/apache/ignite/internal/tx/TxManager.java
index e2e2cb6f619..4d6ecf0f38a 100644
---
a/modules/transactions/src/main/java/org/apache/ignite/internal/tx/TxManager.java
+++
b/modules/transactions/src/main/java/org/apache/ignite/internal/tx/TxManager.java
@@ -18,6 +18,7 @@
package org.apache.ignite.internal.tx;
import java.util.Collection;
+import java.util.List;
import java.util.Map;
import java.util.UUID;
import java.util.concurrent.CompletableFuture;
@@ -154,9 +155,8 @@ public interface TxManager extends IgniteComponent {
* Returns lock manager.
*
* @return Lock manager for the given transactions manager.
- * @deprecated Use lockManager directly.
*/
- @Deprecated
+ @TestOnly
LockManager lockManager();
/**
@@ -218,7 +218,7 @@ public interface TxManager extends IgniteComponent {
*/
CompletableFuture<Void> cleanup(
@Nullable ZonePartitionId commitPartitionId,
- Map<ZonePartitionId, PartitionEnlistment> enlistedPartitions,
+ Map<ZonePartitionId, ? extends PartitionEnlistment>
enlistedPartitions,
boolean commit,
@Nullable HybridTimestamp commitTimestamp,
UUID txId
@@ -270,6 +270,14 @@ public interface TxManager extends IgniteComponent {
*/
CompletableFuture<Boolean> kill(UUID txId);
+ /**
+ * Discards local write intents. Used together with kill command.
+ *
+ * @param groups Groups.
+ * @param txId Transaction id.
+ */
+ CompletableFuture<Void>
discardLocalWriteIntents(List<EnlistedPartitionGroup> groups, UUID txId);
+
/**
* Returns lock retry count.
*
diff --git
a/modules/transactions/src/main/java/org/apache/ignite/internal/tx/impl/IgniteAbstractTransactionImpl.java
b/modules/transactions/src/main/java/org/apache/ignite/internal/tx/impl/IgniteAbstractTransactionImpl.java
index 34075b2c78e..baf57bda9c7 100644
---
a/modules/transactions/src/main/java/org/apache/ignite/internal/tx/impl/IgniteAbstractTransactionImpl.java
+++
b/modules/transactions/src/main/java/org/apache/ignite/internal/tx/impl/IgniteAbstractTransactionImpl.java
@@ -63,13 +63,13 @@ public abstract class IgniteAbstractTransactionImpl
implements InternalTransacti
* The constructor.
*
* @param txManager The tx manager.
- * @param id The id.
* @param observableTsTracker Observation timestamp tracker.
+ * @param id The id.
* @param coordinatorId Transaction coordinator inconsistent ID.
* @param implicit True for an implicit transaction, false for an ordinary
one.
* @param timeout Transaction timeout in milliseconds.
*/
- public IgniteAbstractTransactionImpl(
+ IgniteAbstractTransactionImpl(
TxManager txManager,
HybridTimestampTracker observableTsTracker,
UUID id,
diff --git
a/modules/transactions/src/main/java/org/apache/ignite/internal/tx/impl/ReadWriteTransactionImpl.java
b/modules/transactions/src/main/java/org/apache/ignite/internal/tx/impl/ReadWriteTransactionImpl.java
index ad18054f37e..25b96de1cf1 100644
---
a/modules/transactions/src/main/java/org/apache/ignite/internal/tx/impl/ReadWriteTransactionImpl.java
+++
b/modules/transactions/src/main/java/org/apache/ignite/internal/tx/impl/ReadWriteTransactionImpl.java
@@ -31,11 +31,14 @@ import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.atomic.AtomicReferenceFieldUpdater;
import java.util.concurrent.locks.ReentrantReadWriteLock;
+import java.util.function.Consumer;
import org.apache.ignite.internal.hlc.HybridTimestamp;
import org.apache.ignite.internal.hlc.HybridTimestampTracker;
import org.apache.ignite.internal.replicator.ZonePartitionId;
+import org.apache.ignite.internal.tx.InternalTransaction;
import org.apache.ignite.internal.tx.PendingTxPartitionEnlistment;
import org.apache.ignite.internal.tx.TransactionIds;
+import org.apache.ignite.internal.tx.TransactionKilledException;
import org.apache.ignite.internal.tx.TxManager;
import org.apache.ignite.tx.TransactionException;
import org.jetbrains.annotations.Nullable;
@@ -70,6 +73,12 @@ public class ReadWriteTransactionImpl extends
IgniteAbstractTransactionImpl {
*/
private boolean noRemoteWrites = true;
+ /**
+ * A closure which is called then a transaction is externally killed (not
by direct user call).
+ * Not-null only for a client's transaction.
+ */
+ private final @Nullable Consumer<InternalTransaction> killClosure;
+
/**
* Constructs an explicit read-write transaction.
*
@@ -79,6 +88,7 @@ public class ReadWriteTransactionImpl extends
IgniteAbstractTransactionImpl {
* @param txCoordinatorId Transaction coordinator inconsistent ID.
* @param implicit True for an implicit transaction, false for an ordinary
one.
* @param timeout The timeout.
+ * @param killClosure Kill closure.
*/
public ReadWriteTransactionImpl(
TxManager txManager,
@@ -86,9 +96,11 @@ public class ReadWriteTransactionImpl extends
IgniteAbstractTransactionImpl {
UUID id,
UUID txCoordinatorId,
boolean implicit,
- long timeout
+ long timeout,
+ @Nullable Consumer<InternalTransaction> killClosure
) {
super(txManager, observableTsTracker, id, txCoordinatorId, implicit,
timeout);
+ this.killClosure = killClosure;
}
/** {@inheritDoc} */
@@ -119,8 +131,7 @@ public class ReadWriteTransactionImpl extends
IgniteAbstractTransactionImpl {
) {
// No need to wait for lock if commit is in progress.
if (!enlistPartitionLock.readLock().tryLock()) {
- failEnlist();
- assert false; // Not reachable.
+ throw enlistFailedException();
}
try {
@@ -140,20 +151,20 @@ public class ReadWriteTransactionImpl extends
IgniteAbstractTransactionImpl {
/**
* Fails the operation.
*/
- private void failEnlist() {
- throw new TransactionException(
- TX_ALREADY_FINISHED_ERR,
- format("Transaction is already finished [{}, txState={}].",
- formatTxInfo(id(), txManager, false), state()));
+ private RuntimeException enlistFailedException() {
+ return killed ? new TransactionKilledException(id(), txManager) :
+ new TransactionException(
+ TX_ALREADY_FINISHED_ERR,
+ format("Transaction is already finished [{},
txState={}].",
+ formatTxInfo(id(), txManager, false),
state()));
}
/**
* Checks that this transaction was not finished and will be able to
enlist another partition.
*/
private void checkEnlistPossibility() {
- if (isFinishingOrFinished()) {
- // This means that the transaction is either in final or FINISHING
state.
- failEnlist();
+ if (isFinishingOrFinished() || killed) {
+ throw enlistFailedException();
}
}
@@ -220,14 +231,12 @@ public class ReadWriteTransactionImpl extends
IgniteAbstractTransactionImpl {
if (finishFuture == null) {
if (killed) {
if (isComplete) {
+ // An attempt to finish a killed transaction.
finishFuture = nullCompletedFuture();
- return failedFuture(new TransactionException(
- TX_ALREADY_FINISHED_ERR,
- format("Transaction is killed [{},
txState={}].",
- formatTxInfo(id(), txManager, false),
state())
- ));
+ return failedFuture(new
TransactionKilledException(id(), txManager));
} else {
+ // Kill is called twice.
return nullCompletedFuture();
}
}
@@ -239,6 +248,9 @@ public class ReadWriteTransactionImpl extends
IgniteAbstractTransactionImpl {
finishFuture = nullCompletedFuture();
this.timeoutExceeded = timeoutExceeded;
} else {
+ if (killClosure == null) {
+ throw new AssertionError("Invalid kill state for a
full transaction");
+ }
killed = true;
}
} else {
@@ -258,6 +270,16 @@ public class ReadWriteTransactionImpl extends
IgniteAbstractTransactionImpl {
this.timeoutExceeded = timeoutExceeded;
} else {
killed = true;
+
+ return finishFutureInternal.handle((unused, throwable)
-> {
+ // TODO
https://issues.apache.org/jira/browse/IGNITE-25825 move before finish after
async cleanup
+ if (killClosure != null) {
+ // Notify the client about the kill.
+ killClosure.accept(this);
+ }
+
+ return null;
+ });
}
// Return the real future first time.
diff --git
a/modules/transactions/src/main/java/org/apache/ignite/internal/tx/impl/TransactionInflights.java
b/modules/transactions/src/main/java/org/apache/ignite/internal/tx/impl/TransactionInflights.java
index 3fee6424cf3..83934e5773a 100644
---
a/modules/transactions/src/main/java/org/apache/ignite/internal/tx/impl/TransactionInflights.java
+++
b/modules/transactions/src/main/java/org/apache/ignite/internal/tx/impl/TransactionInflights.java
@@ -147,29 +147,6 @@ public class TransactionInflights {
return res[0];
}
- /**
- * Track the given RO transaction until finish.
- * Currently RO tracking is used to prevent unclosed cursors.
- *
- * @param txId The transaction id.
- * @return {@code True} if the was registered and is in active state.
- */
- public boolean trackReadOnly(UUID txId) {
- boolean[] res = {true};
-
- txCtxMap.compute(txId, (uuid, ctx) -> {
- if (ctx == null) {
- ctx = new ReadOnlyTxContext();
- }
-
- res[0] = !ctx.isTxFinishing();
-
- return ctx;
- });
-
- return res[0];
- }
-
/**
* Unregisters the inflight for a transaction.
*
diff --git
a/modules/transactions/src/main/java/org/apache/ignite/internal/tx/impl/TxCleanupRequestHandler.java
b/modules/transactions/src/main/java/org/apache/ignite/internal/tx/impl/TxCleanupRequestHandler.java
index fdd2a58c5a3..c08f55865ae 100644
---
a/modules/transactions/src/main/java/org/apache/ignite/internal/tx/impl/TxCleanupRequestHandler.java
+++
b/modules/transactions/src/main/java/org/apache/ignite/internal/tx/impl/TxCleanupRequestHandler.java
@@ -291,6 +291,36 @@ public class TxCleanupRequestHandler {
messagingService.send(sender, ChannelType.DEFAULT, prepareResponse(new
CleanupReplicatedInfo(txId, partitions)));
}
+ /**
+ * Discards local write intents.
+ *
+ * @param partitions Partitions.
+ * @param txId The transaction id.
+ *
+ * @return The future.
+ */
+ CompletableFuture<Void>
discardLocalWriteIntents(List<EnlistedPartitionGroup> partitions, UUID txId) {
+ Map<EnlistedPartitionGroup, CompletableFuture<?>> writeIntentSwitches
= IgniteUtils.newHashMap(partitions.size());
+
+ for (EnlistedPartitionGroup partition : partitions) {
+ CompletableFuture<?> future =
writeIntentSwitchProcessor.switchLocalWriteIntents(
+ partition,
+ txId,
+ false,
+ null
+ );
+
+ writeIntentSwitches.put(partition, future);
+ }
+
+ releaseTxLocks(txId);
+
+ remotelyTriggeredResourceRegistry.close(txId);
+
+ // We don't care about replicating discarded write intents state,
because it will be lazily resolved if needed.
+ return allOf(writeIntentSwitches.values().toArray(new
CompletableFuture<?>[0]));
+ }
+
private static class CleanupContext {
private final InternalClusterNode sender;
diff --git
a/modules/transactions/src/main/java/org/apache/ignite/internal/tx/impl/TxCleanupRequestSender.java
b/modules/transactions/src/main/java/org/apache/ignite/internal/tx/impl/TxCleanupRequestSender.java
index c48d4e655fe..a8e41b8e6d5 100644
---
a/modules/transactions/src/main/java/org/apache/ignite/internal/tx/impl/TxCleanupRequestSender.java
+++
b/modules/transactions/src/main/java/org/apache/ignite/internal/tx/impl/TxCleanupRequestSender.java
@@ -203,7 +203,7 @@ public class TxCleanupRequestSender {
*/
public CompletableFuture<Void> cleanup(
@Nullable ZonePartitionId commitPartitionId,
- Map<ZonePartitionId, PartitionEnlistment> enlistedPartitions,
+ Map<ZonePartitionId, ? extends PartitionEnlistment>
enlistedPartitions,
boolean commit,
@Nullable HybridTimestamp commitTimestamp,
UUID txId
diff --git
a/modules/transactions/src/main/java/org/apache/ignite/internal/tx/impl/TxManagerImpl.java
b/modules/transactions/src/main/java/org/apache/ignite/internal/tx/impl/TxManagerImpl.java
index 69ffa216e6a..0bd927e95cc 100644
---
a/modules/transactions/src/main/java/org/apache/ignite/internal/tx/impl/TxManagerImpl.java
+++
b/modules/transactions/src/main/java/org/apache/ignite/internal/tx/impl/TxManagerImpl.java
@@ -494,7 +494,8 @@ public class TxManagerImpl implements TxManager,
NetworkMessageHandler, SystemVi
txId,
localNodeId,
implicit,
- timeout
+ timeout,
+ options.killClosure()
);
// Implicit transactions are finished as soon as their operation/query
is finished, they cannot be abandoned, so there is
@@ -1146,7 +1147,7 @@ public class TxManagerImpl implements TxManager,
NetworkMessageHandler, SystemVi
@Override
public CompletableFuture<Void> cleanup(
@Nullable ZonePartitionId commitPartitionId,
- Map<ZonePartitionId, PartitionEnlistment> enlistedPartitions,
+ Map<ZonePartitionId, ? extends PartitionEnlistment>
enlistedPartitions,
boolean commit,
@Nullable HybridTimestamp commitTimestamp,
UUID txId
@@ -1202,6 +1203,15 @@ public class TxManagerImpl implements TxManager,
NetworkMessageHandler, SystemVi
return falseCompletedFuture();
}
+ @Override
+ public CompletableFuture<Void>
discardLocalWriteIntents(List<EnlistedPartitionGroup> groups, UUID txId) {
+ return txCleanupRequestHandler.discardLocalWriteIntents(groups,
txId).handle((r, e) -> {
+ // We don't need tx state any more.
+ updateTxMeta(txId, old -> null);
+ return null;
+ });
+ }
+
@Override
public int lockRetryCount() {
return lockRetryCount;
diff --git
a/modules/transactions/src/test/java/org/apache/ignite/internal/tx/AbstractDeadlockPreventionTest.java
b/modules/transactions/src/test/java/org/apache/ignite/internal/tx/AbstractDeadlockPreventionTest.java
index 631a6fd26dd..4a613e36c43 100644
---
a/modules/transactions/src/test/java/org/apache/ignite/internal/tx/AbstractDeadlockPreventionTest.java
+++
b/modules/transactions/src/test/java/org/apache/ignite/internal/tx/AbstractDeadlockPreventionTest.java
@@ -304,6 +304,24 @@ public abstract class AbstractDeadlockPreventionTest
extends AbstractLockingTest
assertThat(xlock(tx2, k), willSucceedFast());
}
+ @Test
+ public void testDeadlockRecovery() {
+ var tx1 = beginTx();
+ var tx2 = beginTx();
+
+ var k = key("test");
+
+ assertThat(slock(tx2, k), willSucceedFast());
+ assertThat(slock(tx1, k), willSucceedFast());
+
+ assertFutureFailsOrWaitsForTimeout(() -> xlock(tx2, k));
+
+ // Failed lock will be rolled to S state. We need to release it
manually.
+ release(tx2, k, LockMode.S);
+
+ assertThat(xlock(tx1, k), willSucceedFast());
+ }
+
/**
* This method checks lock future of conflicting transaction provided by
supplier, in a way depending on deadlock prevention policy.
* If the policy does not allow wait on conflict (wait timeout is equal to
{@code 0}) then the future must be failed with
diff --git
a/modules/transactions/src/test/java/org/apache/ignite/internal/tx/impl/ReadOnlyTransactionImplTest.java
b/modules/transactions/src/test/java/org/apache/ignite/internal/tx/impl/ReadOnlyTransactionImplTest.java
index 4ce5d3a18a6..8ddf65e33b9 100644
---
a/modules/transactions/src/test/java/org/apache/ignite/internal/tx/impl/ReadOnlyTransactionImplTest.java
+++
b/modules/transactions/src/test/java/org/apache/ignite/internal/tx/impl/ReadOnlyTransactionImplTest.java
@@ -49,8 +49,7 @@ class ReadOnlyTransactionImplTest extends
BaseIgniteAbstractTest {
new UUID(1, 2),
10_000,
readTimestamp,
- new CompletableFuture<>()
- );
+ new CompletableFuture<>());
assertThat(tx.schemaTimestamp(), is(readTimestamp));
}
diff --git
a/modules/transactions/src/test/java/org/apache/ignite/internal/tx/impl/ReadWriteTransactionImplTest.java
b/modules/transactions/src/test/java/org/apache/ignite/internal/tx/impl/ReadWriteTransactionImplTest.java
index 8a4c1a16864..51ab93f3819 100644
---
a/modules/transactions/src/test/java/org/apache/ignite/internal/tx/impl/ReadWriteTransactionImplTest.java
+++
b/modules/transactions/src/test/java/org/apache/ignite/internal/tx/impl/ReadWriteTransactionImplTest.java
@@ -80,8 +80,7 @@ class ReadWriteTransactionImplTest extends
BaseIgniteAbstractTest {
UUID txId =
TestTransactionIds.TRANSACTION_ID_GENERATOR.transactionIdFor(beginTs);
var tx = new ReadWriteTransactionImpl(
- txManager, HybridTimestampTracker.atomicTracker(null), txId,
CLUSTER_NODE.id(), false, 10_000
- );
+ txManager, HybridTimestampTracker.atomicTracker(null), txId,
CLUSTER_NODE.id(), false, 10_000, null);
assertThat(tx.schemaTimestamp(), is(beginTs));
}
@@ -114,8 +113,7 @@ class ReadWriteTransactionImplTest extends
BaseIgniteAbstractTest {
UUID txId =
TestTransactionIds.TRANSACTION_ID_GENERATOR.transactionIdFor(beginTs);
var tx = new ReadWriteTransactionImpl(
- txManager, HybridTimestampTracker.atomicTracker(null), txId,
CLUSTER_NODE.id(), false, 10_000
- );
+ txManager, HybridTimestampTracker.atomicTracker(null), txId,
CLUSTER_NODE.id(), false, 10_000, null);
tx.assignCommitPartition(txCommitPart);