This is an automated email from the ASF dual-hosted git repository.
sanpwc pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/ignite-3.git
The following commit(s) were added to refs/heads/main by this push:
new 4cd0fd51eec IGNITE-17838 Implement runInTransaction automatic retries
(#6413)
4cd0fd51eec is described below
commit 4cd0fd51eec2d04c889955a83bfc3f600f16a0bc
Author: Denis Chudov <[email protected]>
AuthorDate: Wed Aug 27 10:44:29 2025 +0300
IGNITE-17838 Implement runInTransaction automatic retries (#6413)
---
.../ignite/tx/IgniteTransactionDefaults.java} | 18 +-
.../org/apache/ignite/tx/IgniteTransactions.java | 112 ++++---
.../ignite/tx/RetriableTransactionException.java} | 15 +-
.../ignite/tx/RunInTransactionInternalImpl.java | 360 ++++++++++++++++++++
.../ignite/tx/RunInTransactionRetryTest.java | 367 +++++++++++++++++++++
.../internal/lang/ComponentStoppingException.java | 3 +-
.../internal/lang/NodeStoppingException.java | 3 +-
.../ignite/internal/util/FastTimestamps.java | 6 +-
.../network/UnresolvableConsistentIdException.java | 3 +-
.../PrimaryReplicaAwaitException.java | 3 +-
.../PrimaryReplicaAwaitTimeoutException.java | 3 +-
.../exception/PrimaryReplicaMissException.java | 4 +-
.../replicator/exception/ReplicationException.java | 3 +-
.../ItTxDistributedCleanupRecoveryTest.java | 4 +-
...tKeyValueBinaryViewApiExplicitRunInTxnTest.java | 3 +-
.../ignite/internal/table/TxAbstractTest.java | 87 ++---
.../ignite/internal/tx/InternalTxOptions.java | 5 +
.../apache/ignite/internal/tx/LockException.java | 4 +-
.../TransactionConfigurationSchema.java | 4 +-
.../tx/impl/PersistentTxStateVacuumizer.java | 4 +-
.../tx/impl/PrimaryReplicaExpiredException.java | 3 +-
21 files changed, 883 insertions(+), 131 deletions(-)
diff --git
a/modules/network-api/src/main/java/org/apache/ignite/internal/network/UnresolvableConsistentIdException.java
b/modules/api/src/main/java/org/apache/ignite/tx/IgniteTransactionDefaults.java
similarity index 59%
copy from
modules/network-api/src/main/java/org/apache/ignite/internal/network/UnresolvableConsistentIdException.java
copy to
modules/api/src/main/java/org/apache/ignite/tx/IgniteTransactionDefaults.java
index cfe4e181ed7..f6abdf37349 100644
---
a/modules/network-api/src/main/java/org/apache/ignite/internal/network/UnresolvableConsistentIdException.java
+++
b/modules/api/src/main/java/org/apache/ignite/tx/IgniteTransactionDefaults.java
@@ -15,18 +15,14 @@
* limitations under the License.
*/
-package org.apache.ignite.internal.network;
-
-import org.apache.ignite.lang.ErrorGroups.Network;
-import org.apache.ignite.lang.IgniteException;
-import org.apache.ignite.network.ClusterNode;
+package org.apache.ignite.tx;
/**
- * Thrown when consistent ID cannot be resolved to a {@link ClusterNode}
instance (i.e. when
- * there is no node with such consistent ID in the physical topology).
+ * Utility class containing transaction default constants.
*/
-public class UnresolvableConsistentIdException extends IgniteException {
- public UnresolvableConsistentIdException(String msg) {
- super(Network.UNRESOLVABLE_CONSISTENT_ID_ERR, msg);
- }
+public class IgniteTransactionDefaults {
+ /**
+ * Default transaction timeout.
+ */
+ public static final long DEFAULT_RW_TX_TIMEOUT_SECONDS = 30;
}
diff --git
a/modules/api/src/main/java/org/apache/ignite/tx/IgniteTransactions.java
b/modules/api/src/main/java/org/apache/ignite/tx/IgniteTransactions.java
index 51cd9641647..0527e93e4d5 100644
--- a/modules/api/src/main/java/org/apache/ignite/tx/IgniteTransactions.java
+++ b/modules/api/src/main/java/org/apache/ignite/tx/IgniteTransactions.java
@@ -17,11 +17,13 @@
package org.apache.ignite.tx;
-import static java.util.concurrent.CompletableFuture.completedFuture;
-import static java.util.function.Function.identity;
+import static
org.apache.ignite.tx.IgniteTransactionDefaults.DEFAULT_RW_TX_TIMEOUT_SECONDS;
+import static
org.apache.ignite.tx.RunInTransactionInternalImpl.runInTransactionAsyncInternal;
+import static
org.apache.ignite.tx.RunInTransactionInternalImpl.runInTransactionInternal;
import java.util.Objects;
import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.TimeUnit;
import java.util.function.Consumer;
import java.util.function.Function;
import org.apache.ignite.table.Table;
@@ -115,7 +117,7 @@ public interface IgniteTransactions {
* }
* </pre>
*
- * <p>The correct variant will be:
+ * <p>The <b>correct</b> variant will be:
* <pre>
* {@code
* igniteTransactions.runInTransaction(tx -> {
@@ -125,7 +127,12 @@ public interface IgniteTransactions {
* }
* </pre>
*
- * <p>If the closure is executed normally (no exceptions) the transaction
is automatically committed.
+ * <p>If the closure is executed normally (no exceptions) the transaction
is automatically committed. In a case of exception, the
+ * closure will be retried automatically within the transaction timeout,
so it must be pure function. If the transaction timeout
+ * expires before the closure completes successfully and the transaction
has been committed, the transaction is rolled back instead.
+ * <br>
+ * The closure is retried only in cases of "expected" exceptions, like
{@code LockException}, {@code TimeoutException},
+ * exceptions related to the primary replica change, etc.
*
* @param clo The closure.
*
@@ -153,7 +160,7 @@ public interface IgniteTransactions {
* }
* </pre>
*
- * <p>The correct variant will be:
+ * <p>The <b>correct</b> variant will be:
* <pre>
* {@code
* igniteTransactions.runInTransaction(tx -> {
@@ -163,7 +170,12 @@ public interface IgniteTransactions {
* }
* </pre>
*
- * <p>If the closure is executed normally (no exceptions) the transaction
is automatically committed.
+ * <p>If the closure is executed normally (no exceptions) the transaction
is automatically committed. In a case of exception, the
+ * closure will be retried automatically within the transaction timeout,
so it must be pure function. If the transaction timeout
+ * expires before the closure completes successfully and the transaction
has been committed, the transaction is rolled back instead.
+ * <br>
+ * The closure is retried only in cases of "expected" exceptions, like
{@code LockException}, {@code TimeoutException},
+ * exceptions related to the primary replica change, etc.
*
* @param options Transaction options.
* @param clo The closure.
@@ -197,7 +209,7 @@ public interface IgniteTransactions {
* }
* </pre>
*
- * <p>The correct variant will be:
+ * <p>The <b>correct</b> variant will be:
* <pre>
* {@code
* igniteTransactions.runInTransaction(tx -> {
@@ -207,7 +219,12 @@ public interface IgniteTransactions {
* }
* </pre>
*
- * <p>If the closure is executed normally (no exceptions) the transaction
is automatically committed.
+ * <p>If the closure is executed normally (no exceptions) the transaction
is automatically committed. In a case of exception, the
+ * closure will be retried automatically within the transaction timeout,
so it must be pure function. If the transaction timeout
+ * expires before the closure completes successfully and the transaction
has been committed, the transaction is rolled back instead.
+ * <br>
+ * The closure is retried only in cases of "expected" exceptions, like
{@code LockException}, {@code TimeoutException},
+ * exceptions related to the primary replica change, etc.
*
* @param clo Closure.
* @param <T> Closure result type.
@@ -237,7 +254,7 @@ public interface IgniteTransactions {
* }
* </pre>
*
- * <p>The correct variant will be:
+ * <p>The <b>correct</b> variant will be:
* <pre>
* {@code
* igniteTransactions.runInTransaction(tx -> {
@@ -247,7 +264,12 @@ public interface IgniteTransactions {
* }
* </pre>
*
- * <p>If the closure is executed normally (no exceptions) the transaction
is automatically committed.
+ * <p>If the closure is executed normally (no exceptions) the transaction
is automatically committed. In a case of exception, the
+ * closure will be retried automatically within the transaction timeout,
so it must be pure function. If the transaction timeout
+ * expires before the closure completes successfully and the transaction
has been committed, the transaction is rolled back instead.
+ * <br>
+ * The closure is retried only in cases of "expected" exceptions, like
{@code LockException}, {@code TimeoutException},
+ * exceptions related to the primary replica change, etc.
*
* @param clo The closure.
* @param options Transaction options.
@@ -257,26 +279,11 @@ public interface IgniteTransactions {
* @throws TransactionException If a transaction can't be finished
successfully.
*/
default <T> T runInTransaction(Function<Transaction, T> clo, @Nullable
TransactionOptions options) throws TransactionException {
- Objects.requireNonNull(clo);
-
- Transaction tx = begin(options);
-
- try {
- T ret = clo.apply(tx);
-
- tx.commit();
-
- return ret;
- } catch (Throwable t) {
- // TODO FIXME https://issues.apache.org/jira/browse/IGNITE-17838
Implement auto retries
- try {
- tx.rollback(); // Try rolling back on user exception.
- } catch (Exception e) {
- t.addSuppressed(e);
- }
-
- throw t;
- }
+ // This start timestamp is not related to transaction's begin
timestamp and only serves as local time for counting the timeout of
+ // possible retries.
+ long startTimestamp = System.currentTimeMillis();
+ long initialTimeout = options == null ?
TimeUnit.SECONDS.toMillis(DEFAULT_RW_TX_TIMEOUT_SECONDS) :
options.timeoutMillis();
+ return runInTransactionInternal(this, clo, options, startTimestamp,
initialTimeout);
}
/**
@@ -293,7 +300,12 @@ public interface IgniteTransactions {
* }
* </pre>
*
- * <p>If the asynchronous chain resulted in no exception, the commitAsync
will be automatically called.
+ * <p>If the asynchronous chain resulted with no exception, the
commitAsync will be automatically called. In a case of exception, the
+ * closure will be retried automatically within the transaction timeout,
so it must be pure function. If the transaction timeout
+ * expires before the closure completes successfully and the transaction
has been committed, the transaction is rolled back instead.
+ * <br>
+ * The closure is retried only in cases of "expected" exceptions, like
{@code LockException}, {@code TimeoutException},
+ * exceptions related to the primary replica change, etc.
*
* @param clo The closure.
* @param <T> Closure result type.
@@ -317,7 +329,13 @@ public interface IgniteTransactions {
* }
* </pre>
*
- * <p>If the asynchronous chain resulted in no exception, the commitAsync
will be automatically called.
+ * <p>If the asynchronous chain resulted with no exception, the
commitAsync will be automatically called. In a case of exception, the
+ * closure will be retried automatically within the transaction timeout,
so it must be pure function. If the transaction timeout
+ * expires before the closure completes successfully and the transaction
has been committed, the transaction is rolled back instead.
+ * <br>
+ * The closure is retried only in cases of "expected" exceptions, like
{@code LockException}, {@code TimeoutException},
+ * exceptions related to the primary replica change, etc.
+ *
*
* @param clo The closure.
* @param options Transaction options.
@@ -326,28 +344,12 @@ public interface IgniteTransactions {
*/
default <T> CompletableFuture<T> runInTransactionAsync(
Function<Transaction, CompletableFuture<T>> clo,
- @Nullable TransactionOptions options) {
- Objects.requireNonNull(clo);
-
- // TODO FIXME https://issues.apache.org/jira/browse/IGNITE-17838
Implement auto retries
- return beginAsync(options).thenCompose(tx -> {
- try {
- return clo.apply(tx).handle((res, e) -> {
- if (e != null) {
- return tx.rollbackAsync().exceptionally(e0 -> {
- e.addSuppressed(e0);
- return null;
- }).thenCompose(ignored ->
CompletableFuture.<T>failedFuture(e));
- }
-
- return completedFuture(res);
- }).thenCompose(identity()).thenCompose(val ->
tx.commitAsync().thenApply(ignored -> val));
- } catch (Exception e) {
- return tx.rollbackAsync().exceptionally(e0 -> {
- e.addSuppressed(e0);
- return null;
- }).thenCompose(ignored -> CompletableFuture.failedFuture(e));
- }
- });
+ @Nullable TransactionOptions options
+ ) {
+ // This start timestamp is not related to transaction's begin
timestamp and only serves as local time for counting the timeout of
+ // possible retries.
+ long startTimestamp = System.currentTimeMillis();
+ long initialTimeout = options == null ?
TimeUnit.SECONDS.toMillis(DEFAULT_RW_TX_TIMEOUT_SECONDS) :
options.timeoutMillis();
+ return runInTransactionAsyncInternal(this, clo, options,
startTimestamp, initialTimeout, null);
}
}
diff --git
a/modules/network-api/src/main/java/org/apache/ignite/internal/network/UnresolvableConsistentIdException.java
b/modules/api/src/main/java/org/apache/ignite/tx/RetriableTransactionException.java
similarity index 59%
copy from
modules/network-api/src/main/java/org/apache/ignite/internal/network/UnresolvableConsistentIdException.java
copy to
modules/api/src/main/java/org/apache/ignite/tx/RetriableTransactionException.java
index cfe4e181ed7..0a50878901e 100644
---
a/modules/network-api/src/main/java/org/apache/ignite/internal/network/UnresolvableConsistentIdException.java
+++
b/modules/api/src/main/java/org/apache/ignite/tx/RetriableTransactionException.java
@@ -15,18 +15,11 @@
* limitations under the License.
*/
-package org.apache.ignite.internal.network;
-
-import org.apache.ignite.lang.ErrorGroups.Network;
-import org.apache.ignite.lang.IgniteException;
-import org.apache.ignite.network.ClusterNode;
+package org.apache.ignite.tx;
/**
- * Thrown when consistent ID cannot be resolved to a {@link ClusterNode}
instance (i.e. when
- * there is no node with such consistent ID in the physical topology).
+ * This is the marker interface for exceptions that can be retried if happened
in {@link IgniteTransactions#runInTransaction} and
+ * {@link IgniteTransactions#runInTransactionAsync}.
*/
-public class UnresolvableConsistentIdException extends IgniteException {
- public UnresolvableConsistentIdException(String msg) {
- super(Network.UNRESOLVABLE_CONSISTENT_ID_ERR, msg);
- }
+public interface RetriableTransactionException {
}
diff --git
a/modules/api/src/main/java/org/apache/ignite/tx/RunInTransactionInternalImpl.java
b/modules/api/src/main/java/org/apache/ignite/tx/RunInTransactionInternalImpl.java
new file mode 100644
index 00000000000..c67797cc4ef
--- /dev/null
+++
b/modules/api/src/main/java/org/apache/ignite/tx/RunInTransactionInternalImpl.java
@@ -0,0 +1,360 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.tx;
+
+import static java.util.Collections.synchronizedList;
+import static java.util.concurrent.CompletableFuture.completedFuture;
+import static java.util.concurrent.CompletableFuture.failedFuture;
+import static java.util.function.Function.identity;
+import static
org.apache.ignite.tx.IgniteTransactionDefaults.DEFAULT_RW_TX_TIMEOUT_SECONDS;
+
+import java.util.ArrayList;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Objects;
+import java.util.Set;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.TimeoutException;
+import java.util.function.Function;
+import org.jetbrains.annotations.Nullable;
+
+/**
+ * This is, in fact, the default implementation of the {@link
IgniteTransactions#runInTransaction} and
+ * {@link IgniteTransactions#runInTransactionAsync}, moved from the separate
class to avoid the interface overloading. This
+ * implementation is common for both client and embedded {@link
IgniteTransactions}.
+ */
+class RunInTransactionInternalImpl {
+ private static final int MAX_SUPPRESSED = 100;
+
+ static <T> T runInTransactionInternal(
+ IgniteTransactions igniteTransactions,
+ Function<Transaction, T> clo,
+ @Nullable TransactionOptions options,
+ long startTimestamp,
+ long initialTimeout
+ ) throws TransactionException {
+ Objects.requireNonNull(clo);
+
+ TransactionOptions txOptions = options == null
+ ? new
TransactionOptions().timeoutMillis(TimeUnit.SECONDS.toMillis(DEFAULT_RW_TX_TIMEOUT_SECONDS))
+ : options;
+
+ List<Throwable> suppressed = new ArrayList<>();
+
+ Transaction tx;
+ T ret;
+
+ while (true) {
+ tx = igniteTransactions.begin(txOptions);
+
+ try {
+ ret = clo.apply(tx);
+
+ break;
+ } catch (Exception ex) {
+ addSuppressedToList(suppressed, ex);
+
+ long remainingTime = calcRemainingTime(initialTimeout,
startTimestamp);
+
+ if (remainingTime > 0 && isRetriable(ex)) {
+ // Rollback on user exception, should be retried until
success or timeout to ensure the lock release
+ // before the next attempt.
+ rollbackWithRetry(tx, ex, startTimestamp, initialTimeout,
suppressed);
+
+ long remaining = calcRemainingTime(initialTimeout,
startTimestamp);
+
+ if (remaining > 0) {
+ // Will go on retry iteration.
+ txOptions = txOptions.timeoutMillis(remainingTime);
+ } else {
+ throwExceptionWithSuppressed(ex, suppressed);
+ }
+ } else {
+ try {
+ // No retries here, rely on the durable finish.
+ tx.rollback();
+ } catch (Exception e) {
+ addSuppressedToList(suppressed, e);
+ }
+
+ throwExceptionWithSuppressed(ex, suppressed);
+ }
+ }
+ }
+
+ try {
+ tx.commit();
+ } catch (Exception e) {
+ try {
+ // Try to rollback tx in case if it's not finished. Retry is
not needed here due to the durable finish.
+ tx.rollback();
+ } catch (Exception re) {
+ e.addSuppressed(re);
+ }
+
+ throw e;
+ }
+
+ return ret;
+ }
+
+ private static void rollbackWithRetry(
+ Transaction tx,
+ Exception closureException,
+ long startTimestamp,
+ long initialTimeout,
+ List<Throwable> suppressed
+ ) {
+ while (true) {
+ try {
+ tx.rollback();
+
+ break;
+ } catch (Exception re) {
+ addSuppressedToList(suppressed, re);
+
+ if (calcRemainingTime(initialTimeout, startTimestamp) <= 0) {
+ throwExceptionWithSuppressed(closureException, suppressed);
+ }
+ }
+ }
+ }
+
+ static <T> CompletableFuture<T> runInTransactionAsyncInternal(
+ IgniteTransactions igniteTransactions,
+ Function<Transaction, CompletableFuture<T>> clo,
+ @Nullable TransactionOptions options,
+ long startTimestamp,
+ long initialTimeout,
+ @Nullable List<Throwable> suppressed
+ ) {
+ Objects.requireNonNull(clo);
+
+ TransactionOptions txOptions = options == null
+ ? new
TransactionOptions().timeoutMillis(TimeUnit.SECONDS.toMillis(DEFAULT_RW_TX_TIMEOUT_SECONDS))
+ : options;
+
+ List<Throwable> sup = suppressed == null ? synchronizedList(new
ArrayList<>()) : suppressed;
+
+ return igniteTransactions
+ .beginAsync(txOptions)
+ // User closure with retries.
+ .thenCompose(tx -> {
+ try {
+ return clo.apply(tx)
+ .handle((res, e) -> {
+ if (e != null) {
+ return handleClosureException(
+ igniteTransactions,
+ tx,
+ clo,
+ txOptions,
+ startTimestamp,
+ initialTimeout,
+ sup,
+ e
+ );
+ } else {
+ return completedFuture(res);
+ }
+ })
+ .thenCompose(identity())
+ .thenApply(res -> new TxWithVal<>(tx, res));
+ } catch (Exception e) {
+ return handleClosureException(igniteTransactions, tx,
clo, txOptions, startTimestamp, initialTimeout, sup, e)
+ .thenApply(res -> new TxWithVal<>(tx, res));
+ }
+ })
+ // Transaction commit with rollback on failure, without
retries.
+ // Transaction rollback on closure failure is implemented in
closure retry logic.
+ .thenCompose(txWithVal ->
+ txWithVal.tx.commitAsync()
+ .handle((ignored, e) -> {
+ if (e == null) {
+ return completedFuture(null);
+ } else {
+ return txWithVal.tx.rollbackAsync()
+ // Rethrow commit exception.
+ .handle((ign, re) ->
sneakyThrow(e));
+ }
+ })
+ .thenCompose(fut -> fut)
+ .thenApply(ignored -> txWithVal.val)
+ );
+ }
+
+ private static <T> CompletableFuture<T> handleClosureException(
+ IgniteTransactions igniteTransactions,
+ Transaction currentTx,
+ Function<Transaction, CompletableFuture<T>> clo,
+ TransactionOptions txOptions,
+ long startTimestamp,
+ long initialTimeout,
+ List<Throwable> suppressed,
+ Throwable e
+ ) {
+ addSuppressedToList(suppressed, e);
+
+ long remainingTime = calcRemainingTime(initialTimeout, startTimestamp);
+
+ if (remainingTime > 0 && isRetriable(e)) {
+ // Rollback on user exception, should be retried until success or
timeout to ensure the lock release
+ // before the next attempt.
+ return rollbackWithRetryAsync(currentTx, startTimestamp,
initialTimeout, suppressed, e)
+ .thenCompose(ignored -> {
+ long remaining = calcRemainingTime(initialTimeout,
startTimestamp);
+
+ if (remaining > 0) {
+ TransactionOptions opt =
txOptions.timeoutMillis(remaining);
+
+ return runInTransactionAsyncInternal(
+ igniteTransactions,
+ clo,
+ opt,
+ startTimestamp,
+ initialTimeout,
+ suppressed
+ );
+ } else {
+ return throwExceptionWithSuppressedAsync(e,
suppressed)
+ .thenApply(ign -> null);
+ }
+ });
+ } else {
+ // No retries here, rely on the durable finish.
+ return currentTx.rollbackAsync()
+ .exceptionally(re -> {
+ addSuppressedToList(suppressed, re);
+
+ return null;
+ })
+ .thenCompose(ignored ->
throwExceptionWithSuppressedAsync(e, suppressed))
+ // Never executed.
+ .thenApply(ignored -> null);
+ }
+ }
+
+ private static CompletableFuture<Void> rollbackWithRetryAsync(
+ Transaction tx,
+ long startTimestamp,
+ long initialTimeout,
+ List<Throwable> suppressed,
+ Throwable e
+ ) {
+ return tx.rollbackAsync()
+ .handle((ignored, re) -> {
+ CompletableFuture<Void> fut;
+
+ if (re == null) {
+ fut = completedFuture(null);
+ } else {
+ addSuppressedToList(suppressed, re);
+
+ if (calcRemainingTime(initialTimeout, startTimestamp)
<= 0) {
+ for (Throwable s : suppressed) {
+ addSuppressed(e, s);
+ }
+
+ fut = failedFuture(e);
+ } else {
+ fut = rollbackWithRetryAsync(tx, startTimestamp,
initialTimeout, suppressed, e);
+ }
+ }
+
+ return fut;
+ })
+ .thenCompose(identity());
+ }
+
+ private static void addSuppressedToList(List<Throwable> to, Throwable a) {
+ if (to.size() < MAX_SUPPRESSED) {
+ to.add(a);
+ }
+ }
+
+ private static void addSuppressed(Throwable to, Throwable a) {
+ if (to != null && a != null && to != a && to.getSuppressed().length <
MAX_SUPPRESSED) {
+ to.addSuppressed(a);
+ }
+ }
+
+ private static void throwExceptionWithSuppressed(Throwable e,
List<Throwable> suppressed) {
+ for (Throwable t : suppressed) {
+ addSuppressed(e, t);
+ }
+
+ sneakyThrow(e);
+ }
+
+ private static CompletableFuture<Void>
throwExceptionWithSuppressedAsync(Throwable e, List<Throwable> suppressed) {
+ for (Throwable t : suppressed) {
+ addSuppressed(e, t);
+ }
+
+ return failedFuture(e);
+ }
+
+ private static boolean isRetriable(Throwable e) {
+ return hasCause(e,
+ TimeoutException.class,
+ RetriableTransactionException.class
+ );
+ }
+
+ private static boolean hasCause(Throwable e, Class<?>... classes) {
+ Set<Throwable> processed = new HashSet<>();
+
+ Throwable cause = e;
+ while (cause != null) {
+ if (!processed.add(cause)) {
+ break;
+ }
+
+ for (Class<?> cls : classes) {
+ if (cls.isAssignableFrom(cause.getClass())) {
+ return true;
+ }
+ }
+
+ cause = cause.getCause();
+ }
+
+ return false;
+ }
+
+ private static long calcRemainingTime(long initialTimeout, long
startTimestamp) {
+ long now = System.currentTimeMillis();
+ long remainingTime = initialTimeout - (now - startTimestamp);
+ return remainingTime;
+ }
+
+ private static <E extends Throwable> E sneakyThrow(Throwable e) throws E {
+ throw (E) e;
+ }
+
+ private static class TxWithVal<T> {
+ private final Transaction tx;
+ private final T val;
+
+ private TxWithVal(Transaction tx, T val) {
+ this.tx = tx;
+ this.val = val;
+ }
+ }
+}
diff --git
a/modules/api/src/test/java/org/apache/ignite/tx/RunInTransactionRetryTest.java
b/modules/api/src/test/java/org/apache/ignite/tx/RunInTransactionRetryTest.java
new file mode 100644
index 00000000000..c629eba22fd
--- /dev/null
+++
b/modules/api/src/test/java/org/apache/ignite/tx/RunInTransactionRetryTest.java
@@ -0,0 +1,367 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.tx;
+
+import static java.util.concurrent.CompletableFuture.completedFuture;
+import static java.util.concurrent.CompletableFuture.failedFuture;
+import static org.hamcrest.MatcherAssert.assertThat;
+import static org.hamcrest.Matchers.greaterThan;
+import static org.hamcrest.Matchers.instanceOf;
+import static org.junit.jupiter.api.Assertions.assertEquals;
+import static org.junit.jupiter.api.Assertions.assertFalse;
+import static org.junit.jupiter.api.Assertions.assertInstanceOf;
+import static org.junit.jupiter.api.Assertions.assertNotNull;
+import static org.junit.jupiter.api.Assertions.assertThrows;
+import static org.junit.jupiter.api.Assertions.assertTrue;
+import static org.junit.jupiter.api.Assertions.fail;
+
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.CompletionException;
+import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.concurrent.atomic.AtomicInteger;
+import java.util.function.Consumer;
+import java.util.function.Function;
+import java.util.function.Supplier;
+import org.jetbrains.annotations.Nullable;
+import org.junit.jupiter.api.BeforeEach;
+import org.junit.jupiter.api.Test;
+import org.junitpioneer.jupiter.cartesian.ArgumentSets;
+import org.junitpioneer.jupiter.cartesian.CartesianTest;
+
+/**
+ * Tests {@link IgniteTransactions#runInTransaction} and {@link
IgniteTransactions#runInTransactionAsync} retries.
+ */
+public class RunInTransactionRetryTest {
+ private static final int SHORT_TIMEOUT_MILLIS = 100;
+
+ private long testStartTime;
+
+ @BeforeEach
+ public void setUp() {
+ testStartTime = System.currentTimeMillis();
+ }
+
+ /**
+ * Tests the different scenarios of retries.
+ */
+ @CartesianTest
+ @CartesianTest.MethodFactory("testRetriesArgFactory")
+ public void testRetries(
+ boolean async,
+ int closureFailureCount,
+ int commitFailureCount,
+ int rollbackFailureCount,
+ ClosureFailureType closureFailureType
+ ) {
+ var closureFailures = new AtomicInteger(closureFailureCount);
+ var igniteTransactions = new
MockIgniteTransactions(commitFailureCount, rollbackFailureCount);
+
+ Supplier<CompletableFuture<Integer>> testClosure;
+
+ if (async) {
+ testClosure = () -> igniteTransactions.runInTransactionAsync(
+ tx -> closureCall(tx, closureFailures, closureFailureType),
+ withShortTimeout()
+ );
+ } else {
+ // closureFailureType is always SYNC in sync mode (closure
shouldn't return future).
+ testClosure = () -> igniteTransactions.runInTransaction(
+ (Function<Transaction, CompletableFuture<Integer>>) tx ->
+ closureCall(tx, closureFailures,
ClosureFailureType.SYNC_FAIL),
+ withShortTimeout()
+ );
+ }
+
+ boolean requiresEventualSuccess = closureFailureCount <
Integer.MAX_VALUE
+ // Commit failure can't be retried.
+ && commitFailureCount == 0
+ // Rollbacks should be retried until success or timeout, so
the rollback must succeed before closure retry.
+ && (closureFailureCount == 0 || rollbackFailureCount <
Integer.MAX_VALUE);
+
+ boolean syncFail = false;
+ Exception ex = null;
+
+ CompletableFuture<Integer> future = null;
+
+ try {
+ future = testClosure.get();
+ } catch (Exception e) {
+ syncFail = true;
+ ex = e;
+ }
+
+ if (!syncFail) {
+ try {
+ future.join();
+
+ // Closure succeeded, check that it's expected.
+ assertTrue(requiresEventualSuccess);
+ } catch (Exception e) {
+ ex = e;
+ }
+ }
+
+ if (requiresEventualSuccess) {
+ assertEquals(42, future.join());
+ } else {
+ if (closureFailureCount == Integer.MAX_VALUE) {
+ // Had to retry until timed out.
+ checkTimeout();
+ }
+
+ assertNotNull(ex);
+
+ if (!async) {
+ assertTrue(syncFail);
+
+ if (commitFailureCount > 0) {
+ if (closureFailureCount == Integer.MAX_VALUE ||
closureFailureCount > 0 && rollbackFailureCount == Integer.MAX_VALUE) {
+ // Closure exception should be rethrown.
+ assertThat(ex,
instanceOf(FailedClosureTestException.class));
+ } else {
+ assertThat(ex,
instanceOf(FailedCommitTestException.class));
+ }
+ } else {
+ assertThat(ex,
instanceOf(FailedClosureTestException.class));
+ }
+ } else {
+ assertFalse(syncFail);
+
+ assertThat(ex, instanceOf(CompletionException.class));
+ assertThat(ex.getCause(), instanceOf(Exception.class));
+ Exception cause = (Exception) ex.getCause();
+
+ assertTrue(
+ cause instanceof FailedClosureTestException || cause
instanceof FailedCommitTestException
+ );
+ }
+ }
+ }
+
+ @SuppressWarnings("unused")
+ private static ArgumentSets testRetriesArgFactory() {
+ return ArgumentSets.argumentsForFirstParameter(true, false)
+ .argumentsForNextParameter(0, 5, 10, Integer.MAX_VALUE)
+ .argumentsForNextParameter(0, 5, 10, Integer.MAX_VALUE)
+ .argumentsForNextParameter(0, 5, 10, Integer.MAX_VALUE)
+ .argumentsForNextParameter(ClosureFailureType.SYNC_FAIL,
ClosureFailureType.FUTURE_FAIL);
+ }
+
+ @Test
+ public void testNoRetryAfterTimeout() {
+ var igniteTransactions = new MockIgniteTransactions(0, 0);
+
+ AtomicBoolean runned = new AtomicBoolean();
+
+ assertThrows(
+ FailedClosureTestException.class,
+ () -> igniteTransactions.runInTransaction(
+ (Consumer<Transaction>) tx -> {
+ assertFalse(runned.get());
+ runned.set(true);
+ sleep(100);
+ throw new FailedClosureTestException();
+ },
+ new TransactionOptions().timeoutMillis(1)
+ )
+ );
+ }
+
+ @Test
+ public void testNoRetryAfterTimeoutAsync() {
+ var igniteTransactions = new MockIgniteTransactions(0, 0);
+
+ AtomicBoolean runned = new AtomicBoolean();
+
+ CompletableFuture<Integer> future =
igniteTransactions.runInTransactionAsync(
+ tx -> {
+ assertFalse(runned.get());
+ runned.set(true);
+ sleep(100);
+ throw new FailedClosureTestException();
+ },
+ new TransactionOptions().timeoutMillis(1)
+ );
+
+ try {
+ future.join();
+ fail();
+ } catch (Exception e) {
+ assertThat(e, instanceOf(CompletionException.class));
+ assertThat(e.getCause(),
instanceOf(FailedClosureTestException.class));
+ }
+ }
+
+ private static TransactionOptions withShortTimeout() {
+ return new TransactionOptions().timeoutMillis(SHORT_TIMEOUT_MILLIS);
+ }
+
+ private void checkTimeout() {
+ long now = System.currentTimeMillis();
+ long duration = now - testStartTime;
+ // Assuming that at least 80% of timeout has passed (assuming
currentTimeMillis inaccuracy).
+ assertThat("duration was: " + duration, duration, greaterThan((long)
(SHORT_TIMEOUT_MILLIS * 0.8)));
+ }
+
+ private static CompletableFuture<Integer> closureCall(
+ Transaction tx,
+ AtomicInteger closureFailures,
+ ClosureFailureType closureFailureType
+ ) {
+ assertNotNull(tx);
+ assertFalse(isFinished(tx));
+
+ if (closureFailures.get() > 0) {
+ closureFailures.decrementAndGet();
+
+ if (closureFailureType == ClosureFailureType.SYNC_FAIL) {
+ throw new FailedClosureTestException();
+ } else if (closureFailureType == ClosureFailureType.FUTURE_FAIL) {
+ return failedFuture(new FailedClosureTestException());
+ } else {
+ throw new AssertionError("unknown failure type");
+ }
+ } else {
+ return completedFuture(42);
+ }
+ }
+
+ private static boolean isFinished(Transaction tx) {
+ assertInstanceOf(MockTransaction.class, tx);
+ return ((MockTransaction) tx).finished;
+ }
+
+ private static void sleep(long duration) {
+ try {
+ Thread.sleep(duration);
+ } catch (InterruptedException e) {
+ throw new RuntimeException(e);
+ }
+ }
+
+ /**
+ * If closure should fail, this enum defines the type of failure.
+ */
+ private enum ClosureFailureType {
+ /** Closure throws exception. */
+ SYNC_FAIL,
+
+ /** Closure returns failed future. */
+ FUTURE_FAIL
+ }
+
+ private static class MockIgniteTransactions implements IgniteTransactions {
+ final AtomicInteger commitsToFail;
+ final AtomicInteger rollbacksToFail;
+
+ MockIgniteTransactions(int commitsToFail, int rollbacksToFail) {
+ this.commitsToFail = new AtomicInteger(commitsToFail);
+ this.rollbacksToFail = new AtomicInteger(rollbacksToFail);
+ }
+
+ @Override
+ public Transaction begin(@Nullable TransactionOptions options) {
+ return new MockTransaction(commitsToFail, rollbacksToFail);
+ }
+
+ @Override
+ public CompletableFuture<Transaction> beginAsync(@Nullable
TransactionOptions options) {
+ return completedFuture(begin(options));
+ }
+ }
+
+ private static class MockTransaction implements Transaction {
+ final AtomicInteger commitsToFail;
+ final AtomicInteger rollbacksToFail;
+ boolean finished;
+
+ MockTransaction(AtomicInteger commitsToFail, AtomicInteger
rollbacksToFail) {
+ this.commitsToFail = commitsToFail;
+ this.rollbacksToFail = rollbacksToFail;
+ }
+
+ @Override
+ public void commit() throws TransactionException {
+ try {
+ commitAsync().join();
+ } catch (CompletionException e) {
+ if (e.getCause() instanceof RuntimeException) {
+ throw (RuntimeException) e.getCause();
+ } else {
+ throw e;
+ }
+ }
+ }
+
+ @Override
+ public CompletableFuture<Void> commitAsync() {
+ sleep(1);
+
+ if (commitsToFail.get() > 0) {
+ commitsToFail.decrementAndGet();
+ return failedFuture(new FailedCommitTestException());
+ } else {
+ finished = true;
+ return completedFuture(null);
+ }
+ }
+
+ @Override
+ public void rollback() throws TransactionException {
+ try {
+ rollbackAsync().join();
+ } catch (CompletionException e) {
+ if (e.getCause() instanceof RuntimeException) {
+ throw (RuntimeException) e.getCause();
+ } else {
+ throw e;
+ }
+ }
+ }
+
+ @Override
+ public CompletableFuture<Void> rollbackAsync() {
+ sleep(1);
+
+ if (rollbacksToFail.get() > 0) {
+ rollbacksToFail.decrementAndGet();
+ return failedFuture(new FailedRollbackTestException());
+ } else {
+ finished = true;
+ return completedFuture(null);
+ }
+ }
+
+ @Override
+ public boolean isReadOnly() {
+ return false;
+ }
+ }
+
+ private static class FailedClosureTestException extends RuntimeException
implements RetriableTransactionException {
+ // No-op.
+ }
+
+ private static class FailedCommitTestException extends RuntimeException
implements RetriableTransactionException {
+ // No-op.
+ }
+
+ private static class FailedRollbackTestException extends RuntimeException
implements RetriableTransactionException {
+ // No-op.
+ }
+}
diff --git
a/modules/core/src/main/java/org/apache/ignite/internal/lang/ComponentStoppingException.java
b/modules/core/src/main/java/org/apache/ignite/internal/lang/ComponentStoppingException.java
index 02e7759ae49..06354bfe440 100644
---
a/modules/core/src/main/java/org/apache/ignite/internal/lang/ComponentStoppingException.java
+++
b/modules/core/src/main/java/org/apache/ignite/internal/lang/ComponentStoppingException.java
@@ -20,6 +20,7 @@ package org.apache.ignite.internal.lang;
import static org.apache.ignite.lang.ErrorGroups.Common.INTERNAL_ERR;
import java.util.UUID;
+import org.apache.ignite.tx.RetriableTransactionException;
import org.jetbrains.annotations.Nullable;
/**
@@ -27,7 +28,7 @@ import org.jetbrains.annotations.Nullable;
* This is different from {@link NodeStoppingException} as {@link
ComponentStoppingException} might mean that just the component is stopped,
* not the whole node.
*/
-public class ComponentStoppingException extends IgniteInternalCheckedException
{
+public class ComponentStoppingException extends IgniteInternalCheckedException
implements RetriableTransactionException {
/** Serial version UID. */
private static final long serialVersionUID = 0L;
diff --git
a/modules/core/src/main/java/org/apache/ignite/internal/lang/NodeStoppingException.java
b/modules/core/src/main/java/org/apache/ignite/internal/lang/NodeStoppingException.java
index 8983ab6e92a..35fd72fc99f 100644
---
a/modules/core/src/main/java/org/apache/ignite/internal/lang/NodeStoppingException.java
+++
b/modules/core/src/main/java/org/apache/ignite/internal/lang/NodeStoppingException.java
@@ -20,12 +20,13 @@ package org.apache.ignite.internal.lang;
import static org.apache.ignite.lang.ErrorGroups.Common.NODE_STOPPING_ERR;
import java.util.UUID;
+import org.apache.ignite.tx.RetriableTransactionException;
import org.jetbrains.annotations.Nullable;
/**
* This exception is used to indicate that Ignite node is stopping (already
stopped) for some reason.
*/
-public class NodeStoppingException extends IgniteInternalCheckedException {
+public class NodeStoppingException extends IgniteInternalCheckedException
implements RetriableTransactionException {
/** Serial version UID. */
private static final long serialVersionUID = 0L;
diff --git
a/modules/core/src/main/java/org/apache/ignite/internal/util/FastTimestamps.java
b/modules/core/src/main/java/org/apache/ignite/internal/util/FastTimestamps.java
index 000d8c1e537..2a6019bc405 100644
---
a/modules/core/src/main/java/org/apache/ignite/internal/util/FastTimestamps.java
+++
b/modules/core/src/main/java/org/apache/ignite/internal/util/FastTimestamps.java
@@ -42,7 +42,11 @@ public class FastTimestamps {
});
Runnable updaterTask = () -> {
- coarseCurrentTimeMillis = System.currentTimeMillis();
+ long now = System.currentTimeMillis();
+
+ if (now > coarseCurrentTimeMillis) {
+ coarseCurrentTimeMillis = now;
+ }
// Safe-point-friendly hint.
Thread.onSpinWait();
diff --git
a/modules/network-api/src/main/java/org/apache/ignite/internal/network/UnresolvableConsistentIdException.java
b/modules/network-api/src/main/java/org/apache/ignite/internal/network/UnresolvableConsistentIdException.java
index cfe4e181ed7..9057f16c764 100644
---
a/modules/network-api/src/main/java/org/apache/ignite/internal/network/UnresolvableConsistentIdException.java
+++
b/modules/network-api/src/main/java/org/apache/ignite/internal/network/UnresolvableConsistentIdException.java
@@ -20,12 +20,13 @@ package org.apache.ignite.internal.network;
import org.apache.ignite.lang.ErrorGroups.Network;
import org.apache.ignite.lang.IgniteException;
import org.apache.ignite.network.ClusterNode;
+import org.apache.ignite.tx.RetriableTransactionException;
/**
* Thrown when consistent ID cannot be resolved to a {@link ClusterNode}
instance (i.e. when
* there is no node with such consistent ID in the physical topology).
*/
-public class UnresolvableConsistentIdException extends IgniteException {
+public class UnresolvableConsistentIdException extends IgniteException
implements RetriableTransactionException {
public UnresolvableConsistentIdException(String msg) {
super(Network.UNRESOLVABLE_CONSISTENT_ID_ERR, msg);
}
diff --git
a/modules/placement-driver-api/src/main/java/org/apache/ignite/internal/placementdriver/PrimaryReplicaAwaitException.java
b/modules/placement-driver-api/src/main/java/org/apache/ignite/internal/placementdriver/PrimaryReplicaAwaitException.java
index 5de7483175f..39ae5402dda 100644
---
a/modules/placement-driver-api/src/main/java/org/apache/ignite/internal/placementdriver/PrimaryReplicaAwaitException.java
+++
b/modules/placement-driver-api/src/main/java/org/apache/ignite/internal/placementdriver/PrimaryReplicaAwaitException.java
@@ -22,12 +22,13 @@ import static
org.apache.ignite.lang.ErrorGroups.PlacementDriver.PRIMARY_REPLICA
import org.apache.ignite.internal.hlc.HybridTimestamp;
import org.apache.ignite.internal.lang.IgniteInternalException;
import org.apache.ignite.internal.replicator.ReplicationGroupId;
+import org.apache.ignite.tx.RetriableTransactionException;
/**
* The exception is thrown when a primary replica await process has failed.
Please pay attention that there is a specific
* {@link PrimaryReplicaAwaitTimeoutException} for the primary replica await
timeout.
*/
-public class PrimaryReplicaAwaitException extends IgniteInternalException {
+public class PrimaryReplicaAwaitException extends IgniteInternalException
implements RetriableTransactionException {
private static final long serialVersionUID = 1029917546884926160L;
/**
diff --git
a/modules/placement-driver-api/src/main/java/org/apache/ignite/internal/placementdriver/PrimaryReplicaAwaitTimeoutException.java
b/modules/placement-driver-api/src/main/java/org/apache/ignite/internal/placementdriver/PrimaryReplicaAwaitTimeoutException.java
index 35045f9b069..633a2d93cdc 100644
---
a/modules/placement-driver-api/src/main/java/org/apache/ignite/internal/placementdriver/PrimaryReplicaAwaitTimeoutException.java
+++
b/modules/placement-driver-api/src/main/java/org/apache/ignite/internal/placementdriver/PrimaryReplicaAwaitTimeoutException.java
@@ -22,12 +22,13 @@ import static
org.apache.ignite.lang.ErrorGroups.PlacementDriver.PRIMARY_REPLICA
import org.apache.ignite.internal.hlc.HybridTimestamp;
import org.apache.ignite.internal.lang.IgniteInternalException;
import org.apache.ignite.internal.replicator.ReplicationGroupId;
+import org.apache.ignite.tx.RetriableTransactionException;
import org.jetbrains.annotations.Nullable;
/**
* The exception is thrown when a primary replica await process has times out.
*/
-public class PrimaryReplicaAwaitTimeoutException extends
IgniteInternalException {
+public class PrimaryReplicaAwaitTimeoutException extends
IgniteInternalException implements RetriableTransactionException {
private static final long serialVersionUID = -1450288033816499192L;
/**
diff --git
a/modules/replicator/src/main/java/org/apache/ignite/internal/replicator/exception/PrimaryReplicaMissException.java
b/modules/replicator/src/main/java/org/apache/ignite/internal/replicator/exception/PrimaryReplicaMissException.java
index 19ef18868b3..39dfff38cb9 100644
---
a/modules/replicator/src/main/java/org/apache/ignite/internal/replicator/exception/PrimaryReplicaMissException.java
+++
b/modules/replicator/src/main/java/org/apache/ignite/internal/replicator/exception/PrimaryReplicaMissException.java
@@ -22,12 +22,14 @@ import static
org.apache.ignite.lang.ErrorGroups.Replicator.REPLICA_MISS_ERR;
import java.util.UUID;
import org.apache.ignite.internal.lang.IgniteInternalException;
+import org.apache.ignite.tx.RetriableTransactionException;
import org.jetbrains.annotations.Nullable;
/**
* Unchecked exception that is thrown when a replica is not the current
primary replica.
*/
-public class PrimaryReplicaMissException extends IgniteInternalException
implements ExpectedReplicationException {
+public class PrimaryReplicaMissException extends IgniteInternalException
implements ExpectedReplicationException,
+ RetriableTransactionException {
private static final long serialVersionUID = 8755220779942651494L;
/**
diff --git
a/modules/replicator/src/main/java/org/apache/ignite/internal/replicator/exception/ReplicationException.java
b/modules/replicator/src/main/java/org/apache/ignite/internal/replicator/exception/ReplicationException.java
index f6ead96f171..57a18d868d3 100644
---
a/modules/replicator/src/main/java/org/apache/ignite/internal/replicator/exception/ReplicationException.java
+++
b/modules/replicator/src/main/java/org/apache/ignite/internal/replicator/exception/ReplicationException.java
@@ -22,11 +22,12 @@ import static
org.apache.ignite.lang.ErrorGroups.Replicator.REPLICA_COMMON_ERR;
import java.util.UUID;
import org.apache.ignite.internal.lang.IgniteInternalException;
import org.apache.ignite.internal.replicator.ReplicationGroupId;
+import org.apache.ignite.tx.RetriableTransactionException;
/**
* The exception is thrown when some issue happened during a replication.
*/
-public class ReplicationException extends IgniteInternalException {
+public class ReplicationException extends IgniteInternalException implements
RetriableTransactionException {
/**
* Constructor.
*
diff --git
a/modules/table/src/integrationTest/java/org/apache/ignite/distributed/ItTxDistributedCleanupRecoveryTest.java
b/modules/table/src/integrationTest/java/org/apache/ignite/distributed/ItTxDistributedCleanupRecoveryTest.java
index e7f9ab5307a..0deed062641 100644
---
a/modules/table/src/integrationTest/java/org/apache/ignite/distributed/ItTxDistributedCleanupRecoveryTest.java
+++
b/modules/table/src/integrationTest/java/org/apache/ignite/distributed/ItTxDistributedCleanupRecoveryTest.java
@@ -47,7 +47,6 @@ public class ItTxDistributedCleanupRecoveryTest extends
TxAbstractTest {
@BeforeEach
@Override
public void before() throws Exception {
- // The value of 3 is less than the allowed number of cleanup retries.
setDefaultRetryCount(3);
txTestCluster = new ItTxTestCluster(
@@ -74,10 +73,11 @@ public class ItTxDistributedCleanupRecoveryTest extends
TxAbstractTest {
DefaultMessagingService messagingService =
(DefaultMessagingService) clusterService.messagingService();
messagingService.dropMessages((s, networkMessage) -> {
if (networkMessage instanceof TxCleanupMessage &&
defaultRetryCount.getAndDecrement() > 0) {
- logger().info("Dropping cleanup request: {}",
networkMessage);
+ logger().info("Dropping cleanup request [message={}].",
networkMessage);;
return true;
}
+
return false;
});
});
diff --git
a/modules/table/src/integrationTest/java/org/apache/ignite/internal/table/ItKeyValueBinaryViewApiExplicitRunInTxnTest.java
b/modules/table/src/integrationTest/java/org/apache/ignite/internal/table/ItKeyValueBinaryViewApiExplicitRunInTxnTest.java
index 67c7642d103..48200c1de88 100644
---
a/modules/table/src/integrationTest/java/org/apache/ignite/internal/table/ItKeyValueBinaryViewApiExplicitRunInTxnTest.java
+++
b/modules/table/src/integrationTest/java/org/apache/ignite/internal/table/ItKeyValueBinaryViewApiExplicitRunInTxnTest.java
@@ -22,6 +22,7 @@ import org.apache.ignite.Ignite;
import org.apache.ignite.table.KeyValueView;
import org.apache.ignite.table.Tuple;
import org.apache.ignite.tx.Transaction;
+import org.apache.ignite.tx.TransactionOptions;
import org.junit.jupiter.api.function.Executable;
import org.junit.jupiter.params.ParameterizedTest;
import org.junit.jupiter.params.provider.MethodSource;
@@ -54,7 +55,7 @@ public class ItKeyValueBinaryViewApiExplicitRunInTxnTest
extends ItKeyValueBinar
private static class TxTestCase extends TestCase {
@Override
protected Executable wrap(Consumer<Transaction> run) {
- return () -> ignite.transactions().runInTransaction(run);
+ return () -> ignite.transactions().runInTransaction(run, new
TransactionOptions().timeoutMillis(1));
}
TxTestCase(boolean async, boolean thin, KeyValueView<Tuple, Tuple>
view, TestTableDefinition tableDefinition, Ignite ignite) {
diff --git
a/modules/table/src/testFixtures/java/org/apache/ignite/internal/table/TxAbstractTest.java
b/modules/table/src/testFixtures/java/org/apache/ignite/internal/table/TxAbstractTest.java
index 58f3da7f5c4..55c6281a3be 100644
---
a/modules/table/src/testFixtures/java/org/apache/ignite/internal/table/TxAbstractTest.java
+++
b/modules/table/src/testFixtures/java/org/apache/ignite/internal/table/TxAbstractTest.java
@@ -533,21 +533,24 @@ public abstract class TxAbstractTest extends
TxInfrastructureTest {
RecordView<Tuple> view = accounts.recordView();
view.upsert(null, makeValue(1, balance));
- CompletableFuture<Double> fut0 =
igniteTransactions.runInTransactionAsync(tx -> {
- CompletableFuture<Double> fut = view.getAsync(tx, makeKey(1))
- .thenCompose(val2 -> {
- double prev = val2.doubleValue("balance");
- return view.upsertAsync(tx, makeValue(1, delta +
20)).thenApply(ignored -> prev);
- });
-
- fut.join();
-
- if (true) {
- throw new IllegalArgumentException();
- }
+ CompletableFuture<Double> fut0 =
igniteTransactions.runInTransactionAsync(
+ tx -> {
+ CompletableFuture<Double> fut = view.getAsync(tx,
makeKey(1))
+ .thenCompose(val2 -> {
+ double prev = val2.doubleValue("balance");
+ return view.upsertAsync(tx, makeValue(1,
delta + 20))
+ .thenApply(ignored -> prev);
+ })
+ .whenComplete((res, ex) -> log.info("Test: tx
operations in tx closures completed, ex=" + ex));
+
+ if (true) {
+ throw new IllegalArgumentException();
+ }
- return fut;
- });
+ return fut;
+ },
+ new TransactionOptions().timeoutMillis(1000)
+ );
var err = assertThrows(CompletionException.class, fut0::join);
@@ -567,13 +570,15 @@ public abstract class TxAbstractTest extends
TxInfrastructureTest {
public void testTxClosureUncaughtExceptionInChainAsync() {
RecordView<Tuple> view = accounts.recordView();
- CompletableFuture<Double> fut0 =
igniteTransactions.runInTransactionAsync(tx -> {
- return view.getAsync(tx, makeKey(2))
- .thenCompose(val2 -> {
- double prev = val2.doubleValue("balance"); // val2 is
null - NPE is thrown here
- return view.upsertAsync(tx, makeValue(1,
100)).thenApply(ignored -> prev);
- });
- });
+ CompletableFuture<Double> fut0 = igniteTransactions
+ .runInTransactionAsync(
+ tx -> view.getAsync(tx, makeKey(2))
+ .thenCompose(val2 -> {
+ double prev = val2.doubleValue("balance"); //
val2 is null - NPE is thrown here
+ return view.upsertAsync(tx, makeValue(1,
100)).thenApply(ignored -> prev);
+ }),
+ new TransactionOptions().timeoutMillis(1000)
+ );
var err = assertThrows(CompletionException.class, fut0::join);
@@ -644,7 +649,7 @@ public abstract class TxAbstractTest extends
TxInfrastructureTest {
var futUpd2 = table2.upsertAllAsync(tx1, rows2);
- assertTrue(IgniteTestUtils.waitForCondition(() -> {
+ assertTrue(waitForCondition(() -> {
boolean lockUpgraded = false;
for (Iterator<Lock> it =
txManager(accounts).lockManager().locks(tx1.id()); it.hasNext(); ) {
@@ -926,12 +931,18 @@ public abstract class TxAbstractTest extends
TxInfrastructureTest {
accounts.recordView().upsert(null, makeValue(2, 100.));
- assertThrows(RuntimeException.class, () ->
igniteTransactions.runInTransaction((Consumer<Transaction>) tx -> {
- assertNotNull(accounts.recordView().get(tx, key2));
- assertTrue(accounts.recordView().delete(tx, key2));
- assertNull(accounts.recordView().get(tx, key2));
- throw new RuntimeException(); // Triggers rollback.
- }));
+ assertThrows(
+ RuntimeException.class,
+ () -> igniteTransactions.runInTransaction(
+ (Consumer<Transaction>) tx -> {
+ assertNotNull(accounts.recordView().get(tx,
key2));
+ assertTrue(accounts.recordView().delete(tx,
key2));
+ assertNull(accounts.recordView().get(tx,
key2));
+ throw new RuntimeException(); // Triggers
rollback.
+ },
+ new TransactionOptions().timeoutMillis(1000)
+ )
+ );
assertNotNull(accounts.recordView().get(null, key2));
assertTrue(accounts.recordView().delete(null, key2));
@@ -997,7 +1008,7 @@ public abstract class TxAbstractTest extends
TxInfrastructureTest {
validateBalance(txAcc2.getAll(tx1, List.of(makeKey(2), makeKey(1))),
200., 300.);
validateBalance(txAcc2.getAll(tx1, List.of(makeKey(1), makeKey(2))),
300., 200.);
- assertTrue(IgniteTestUtils.waitForCondition(() -> TxState.ABORTED ==
tx2.state(), 5_000), tx2.state().toString());
+ assertTrue(waitForCondition(() -> TxState.ABORTED == tx2.state(),
5_000), tx2.state().toString());
tx1.commit();
@@ -1017,7 +1028,7 @@ public abstract class TxAbstractTest extends
TxInfrastructureTest {
if (true) {
throw new IgniteException(INTERNAL_ERR, "Test error");
}
- }));
+ }, new TransactionOptions().timeoutMillis(1000)));
assertNull(accounts.recordView().get(null, makeKey(3)));
assertNull(accounts.recordView().get(null, makeKey(4)));
@@ -1243,8 +1254,8 @@ public abstract class TxAbstractTest extends
TxInfrastructureTest {
assertEquals("test2", customers.recordView().get(null,
makeKey(1)).stringValue("name"));
assertEquals(200., accounts.recordView().get(null,
makeKey(1)).doubleValue("balance"));
- assertTrue(IgniteTestUtils.waitForCondition(() ->
lockManager(accounts).isEmpty(), 10_000));
- assertTrue(IgniteTestUtils.waitForCondition(() ->
lockManager(customers).isEmpty(), 10_000));
+ assertTrue(waitForCondition(() -> lockManager(accounts).isEmpty(),
10_000));
+ assertTrue(waitForCondition(() -> lockManager(customers).isEmpty(),
10_000));
}
@Test
@@ -1282,7 +1293,7 @@ public abstract class TxAbstractTest extends
TxInfrastructureTest {
assertEquals("test2", customers.recordView().get(null,
makeKey(1)).stringValue("name"));
assertEquals(200., accounts.recordView().get(null,
makeKey(1)).doubleValue("balance"));
- assertTrue(IgniteTestUtils.waitForCondition(() ->
lockManager(accounts).isEmpty(), 10_000));
+ assertTrue(waitForCondition(() -> lockManager(accounts).isEmpty(),
10_000));
}
@Test
@@ -1320,7 +1331,7 @@ public abstract class TxAbstractTest extends
TxInfrastructureTest {
assertEquals("test2", customers.recordView().get(null,
makeKey(1)).stringValue("name"));
assertEquals(200., accounts.recordView().get(null,
makeKey(1)).doubleValue("balance"));
- assertTrue(IgniteTestUtils.waitForCondition(() ->
lockManager(accounts).isEmpty(), 10_000));
+ assertTrue(waitForCondition(() -> lockManager(accounts).isEmpty(),
10_000));
}
@Test
@@ -1338,7 +1349,7 @@ public abstract class TxAbstractTest extends
TxInfrastructureTest {
assertEquals("test2", customers.recordView().get(null,
makeKey(1)).stringValue("name"));
assertEquals(200., accounts.recordView().get(null,
makeKey(1)).doubleValue("balance"));
- assertTrue(IgniteTestUtils.waitForCondition(() ->
lockManager(accounts).isEmpty(), 10_000));
+ assertTrue(waitForCondition(() -> lockManager(accounts).isEmpty(),
10_000));
}
@Test
@@ -1356,7 +1367,7 @@ public abstract class TxAbstractTest extends
TxInfrastructureTest {
assertEquals("test", customers.recordView().get(null,
makeKey(1)).stringValue("name"));
assertEquals(100., accounts.recordView().get(null,
makeKey(1)).doubleValue("balance"));
- assertTrue(IgniteTestUtils.waitForCondition(() ->
lockManager(accounts).isEmpty(), 10_000));
+ assertTrue(waitForCondition(() -> lockManager(accounts).isEmpty(),
10_000));
}
@Test
@@ -1375,7 +1386,7 @@ public abstract class TxAbstractTest extends
TxInfrastructureTest {
assertEquals("test2", customers.recordView().get(null,
makeKey(1)).stringValue("name"));
assertEquals(200., accounts.recordView().get(null,
makeKey(1)).doubleValue("balance"));
- assertTrue(IgniteTestUtils.waitForCondition(() ->
lockManager(accounts).isEmpty(), 10_000));
+ assertTrue(waitForCondition(() -> lockManager(accounts).isEmpty(),
10_000));
}
@Test
@@ -1394,7 +1405,7 @@ public abstract class TxAbstractTest extends
TxInfrastructureTest {
assertEquals("test", customers.recordView().get(null,
makeKey(1)).stringValue("name"));
assertEquals(100., accounts.recordView().get(null,
makeKey(1)).doubleValue("balance"));
- assertTrue(IgniteTestUtils.waitForCondition(() ->
lockManager(accounts).isEmpty(), 10_000));
+ assertTrue(waitForCondition(() -> lockManager(accounts).isEmpty(),
10_000));
}
@Test
diff --git
a/modules/transactions/src/main/java/org/apache/ignite/internal/tx/InternalTxOptions.java
b/modules/transactions/src/main/java/org/apache/ignite/internal/tx/InternalTxOptions.java
index 9586a268419..35118be5896 100644
---
a/modules/transactions/src/main/java/org/apache/ignite/internal/tx/InternalTxOptions.java
+++
b/modules/transactions/src/main/java/org/apache/ignite/internal/tx/InternalTxOptions.java
@@ -18,6 +18,7 @@
package org.apache.ignite.internal.tx;
import org.apache.ignite.internal.hlc.HybridTimestamp;
+import
org.apache.ignite.internal.tx.configuration.TransactionConfigurationSchema;
import org.jetbrains.annotations.Nullable;
/**
@@ -76,6 +77,10 @@ public class InternalTxOptions {
public static class Builder {
private TxPriority priority = TxPriority.NORMAL;
+ /**
+ * This is NOT actually used as the default timeout, see defaults for
{@link TransactionConfigurationSchema#readOnlyTimeoutMillis}
+ * and {@link TransactionConfigurationSchema#readWriteTimeoutMillis}
which are actually used if tx timeout is 0.
+ */
private long timeoutMillis = 0;
@Nullable
diff --git
a/modules/transactions/src/main/java/org/apache/ignite/internal/tx/LockException.java
b/modules/transactions/src/main/java/org/apache/ignite/internal/tx/LockException.java
index ab1919a5375..0e4d0760f18 100644
---
a/modules/transactions/src/main/java/org/apache/ignite/internal/tx/LockException.java
+++
b/modules/transactions/src/main/java/org/apache/ignite/internal/tx/LockException.java
@@ -17,10 +17,12 @@
package org.apache.ignite.internal.tx;
+import org.apache.ignite.tx.RetriableTransactionException;
+
/**
* This exception is thrown when a lock cannot be acquired, released or
downgraded.
*/
-public class LockException extends TransactionInternalCheckedException {
+public class LockException extends TransactionInternalCheckedException
implements RetriableTransactionException {
/**
* Creates a new instance of LockException with the given message.
*
diff --git
a/modules/transactions/src/main/java/org/apache/ignite/internal/tx/configuration/TransactionConfigurationSchema.java
b/modules/transactions/src/main/java/org/apache/ignite/internal/tx/configuration/TransactionConfigurationSchema.java
index 6af2318c298..953a14515fe 100644
---
a/modules/transactions/src/main/java/org/apache/ignite/internal/tx/configuration/TransactionConfigurationSchema.java
+++
b/modules/transactions/src/main/java/org/apache/ignite/internal/tx/configuration/TransactionConfigurationSchema.java
@@ -17,6 +17,8 @@
package org.apache.ignite.internal.tx.configuration;
+import static
org.apache.ignite.tx.IgniteTransactionDefaults.DEFAULT_RW_TX_TIMEOUT_SECONDS;
+
import java.util.concurrent.TimeUnit;
import org.apache.ignite.configuration.annotation.Config;
import org.apache.ignite.configuration.annotation.ConfigValue;
@@ -39,7 +41,7 @@ public class TransactionConfigurationSchema {
@Range(min = 1)
@Value(hasDefault = true)
@PublicName(legacyNames = "readWriteTimeout")
- public final long readWriteTimeoutMillis = TimeUnit.SECONDS.toMillis(30);
+ public final long readWriteTimeoutMillis =
TimeUnit.SECONDS.toMillis(DEFAULT_RW_TX_TIMEOUT_SECONDS);
// Deprecated properties
/** How often abandoned transactions are searched for (milliseconds). */
diff --git
a/modules/transactions/src/main/java/org/apache/ignite/internal/tx/impl/PersistentTxStateVacuumizer.java
b/modules/transactions/src/main/java/org/apache/ignite/internal/tx/impl/PersistentTxStateVacuumizer.java
index 9d4dd6b8b8f..ae4f2d36719 100644
---
a/modules/transactions/src/main/java/org/apache/ignite/internal/tx/impl/PersistentTxStateVacuumizer.java
+++
b/modules/transactions/src/main/java/org/apache/ignite/internal/tx/impl/PersistentTxStateVacuumizer.java
@@ -180,6 +180,7 @@ public class PersistentTxStateVacuumizer {
return hasCause(e,
PrimaryReplicaMissException.class,
NodeStoppingException.class,
+ ComponentStoppingException.class,
GroupOverloadedException.class,
// AwaitReplicaTimeoutException can be thrown from
ReplicaService on receiver node, when there
// is no replica. This may happen if it was removed after
getting the primary replica but before the message was received
@@ -189,8 +190,7 @@ public class PersistentTxStateVacuumizer {
// the persistent tx state.
// Also, replica calls from PersistentTxStateVacuumizer are
local, so retry with new primary replica most likely will
// happen on another node.
- AwaitReplicaTimeoutException.class,
- ComponentStoppingException.class
+ AwaitReplicaTimeoutException.class
);
}
diff --git
a/modules/transactions/src/main/java/org/apache/ignite/internal/tx/impl/PrimaryReplicaExpiredException.java
b/modules/transactions/src/main/java/org/apache/ignite/internal/tx/impl/PrimaryReplicaExpiredException.java
index 923a6437467..92130f03f70 100644
---
a/modules/transactions/src/main/java/org/apache/ignite/internal/tx/impl/PrimaryReplicaExpiredException.java
+++
b/modules/transactions/src/main/java/org/apache/ignite/internal/tx/impl/PrimaryReplicaExpiredException.java
@@ -23,10 +23,11 @@ import org.apache.ignite.internal.hlc.HybridTimestamp;
import org.apache.ignite.internal.lang.IgniteInternalException;
import org.apache.ignite.internal.placementdriver.ReplicaMeta;
import org.apache.ignite.internal.replicator.ReplicationGroupId;
+import org.apache.ignite.tx.RetriableTransactionException;
import org.jetbrains.annotations.Nullable;
/** Unchecked exception that is thrown when primary replica has expired. */
-public class PrimaryReplicaExpiredException extends IgniteInternalException {
+public class PrimaryReplicaExpiredException extends IgniteInternalException
implements RetriableTransactionException {
/**
* The constructor.
*