This is an automated email from the ASF dual-hosted git repository.

ascherbakov pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/ignite-3.git


The following commit(s) were added to refs/heads/main by this push:
     new c25c060763 IGNITE-22130 Fix retries logic.
c25c060763 is described below

commit c25c060763a17dcf2a746d6c5d7bd4c150c391eb
Author: Alexey Scherbakov <alexey.scherbak...@gmail.com>
AuthorDate: Mon May 13 16:50:28 2024 +0300

    IGNITE-22130 Fix retries logic.
---
 .../ignite/internal/util/ExceptionUtils.java       |  24 ++
 .../ignite/internal/replicator/ReplicaService.java |  36 ++-
 .../runner/app/ItIgniteNodeRestartTest.java        |   3 +-
 .../ignite/internal/table/ItTableScanTest.java     |   6 +-
 .../org/apache/ignite/internal/app/IgniteImpl.java |   3 +-
 .../rebalance/ItRebalanceDistributedTest.java      |   3 +-
 .../distributed/storage/InternalTableImpl.java     | 243 +++++++++++----------
 .../apache/ignite/distributed/ItTxTestCluster.java |   7 +-
 .../tx/TransactionExceptionMapperProvider.java     |  46 ++++
 9 files changed, 240 insertions(+), 131 deletions(-)

diff --git 
a/modules/core/src/main/java/org/apache/ignite/internal/util/ExceptionUtils.java
 
b/modules/core/src/main/java/org/apache/ignite/internal/util/ExceptionUtils.java
index dbad400afa..63a365689c 100644
--- 
a/modules/core/src/main/java/org/apache/ignite/internal/util/ExceptionUtils.java
+++ 
b/modules/core/src/main/java/org/apache/ignite/internal/util/ExceptionUtils.java
@@ -567,6 +567,30 @@ public final class ExceptionUtils {
         return INTERNAL_ERR;
     }
 
+    /**
+     * Determine if a particular error matches any of passed error codes.
+     *
+     * @param t Unwrapped throwable.
+     * @param code The code.
+     * @param codes Other codes.
+     * @return {@code True} if exception allows retry.
+     */
+    public static boolean matchAny(Throwable t, int code, int... codes) {
+        int errCode = extractCodeFrom(t);
+
+        if (code == errCode) {
+            return true;
+        }
+
+        for (int c0 : codes) {
+            if (c0 == errCode) {
+                return true;
+            }
+        }
+
+        return false;
+    }
+
     // TODO: https://issues.apache.org/jira/browse/IGNITE-19870
     // This method should be removed or re-worked and usages should be changed 
to IgniteExceptionMapperUtil.mapToPublicException.
     /**
diff --git 
a/modules/replicator/src/main/java/org/apache/ignite/internal/replicator/ReplicaService.java
 
b/modules/replicator/src/main/java/org/apache/ignite/internal/replicator/ReplicaService.java
index 245b3b4bdb..a174eae3b1 100644
--- 
a/modules/replicator/src/main/java/org/apache/ignite/internal/replicator/ReplicaService.java
+++ 
b/modules/replicator/src/main/java/org/apache/ignite/internal/replicator/ReplicaService.java
@@ -17,17 +17,22 @@
 
 package org.apache.ignite.internal.replicator;
 
+import static java.util.concurrent.TimeUnit.MILLISECONDS;
 import static org.apache.ignite.internal.lang.IgniteStringFormatter.format;
+import static org.apache.ignite.internal.util.ExceptionUtils.matchAny;
 import static org.apache.ignite.internal.util.ExceptionUtils.unwrapCause;
 import static org.apache.ignite.internal.util.ExceptionUtils.withCause;
 import static org.apache.ignite.lang.ErrorGroups.Replicator.REPLICA_COMMON_ERR;
+import static org.apache.ignite.lang.ErrorGroups.Replicator.REPLICA_MISS_ERR;
 import static 
org.apache.ignite.lang.ErrorGroups.Replicator.REPLICA_TIMEOUT_ERR;
+import static org.apache.ignite.lang.ErrorGroups.Transactions.ACQUIRE_LOCK_ERR;
 
 import java.util.Map;
 import java.util.concurrent.CompletableFuture;
 import java.util.concurrent.ConcurrentHashMap;
 import java.util.concurrent.Executor;
 import java.util.concurrent.ForkJoinPool;
+import java.util.concurrent.ScheduledExecutorService;
 import java.util.concurrent.TimeoutException;
 import org.apache.ignite.internal.hlc.HybridClock;
 import org.apache.ignite.internal.lang.NodeStoppingException;
@@ -45,10 +50,14 @@ import 
org.apache.ignite.internal.replicator.message.ReplicaRequest;
 import org.apache.ignite.internal.replicator.message.ReplicaResponse;
 import org.apache.ignite.internal.replicator.message.TimestampAware;
 import org.apache.ignite.network.ClusterNode;
+import org.jetbrains.annotations.Nullable;
 import org.jetbrains.annotations.TestOnly;
 
 /** The service is intended to execute requests on replicas. */
 public class ReplicaService {
+    /** Retry timeout. */
+    private static final int RETRY_TIMEOUT_MILLIS = 10;
+
     /** Message service. */
     private final MessagingService messagingService;
 
@@ -59,6 +68,8 @@ public class ReplicaService {
 
     private final ReplicationConfiguration replicationConfiguration;
 
+    private @Nullable final ScheduledExecutorService retryExecutor;
+
     /** Requests to retry. */
     private final Map<String, CompletableFuture<NetworkMessage>> 
pendingInvokes = new ConcurrentHashMap<>();
 
@@ -82,7 +93,8 @@ public class ReplicaService {
                 messagingService,
                 clock,
                 ForkJoinPool.commonPool(),
-                replicationConfiguration
+                replicationConfiguration,
+                null
         );
     }
 
@@ -93,24 +105,27 @@ public class ReplicaService {
      * @param clock A hybrid logical clock.
      * @param partitionOperationsExecutor Partition operation executor.
      * @param replicationConfiguration Replication configuration.
+     * @param retryExecutor Retry executor.
      */
     public ReplicaService(
             MessagingService messagingService,
             HybridClock clock,
             Executor partitionOperationsExecutor,
-            ReplicationConfiguration replicationConfiguration
+            ReplicationConfiguration replicationConfiguration,
+            @Nullable ScheduledExecutorService retryExecutor
     ) {
         this.messagingService = messagingService;
         this.clock = clock;
         this.partitionOperationsExecutor = partitionOperationsExecutor;
         this.replicationConfiguration = replicationConfiguration;
+        this.retryExecutor = retryExecutor;
     }
 
     /**
      * Sends request to the replica node.
      *
      * @param targetNodeConsistentId A consistent id of the replica node..
-     * @param req  Replica request.
+     * @param req Replica request.
      * @return Response future with either evaluation result or completed 
exceptionally.
      * @see NodeStoppingException If either supplier or demander node is 
stopping.
      * @see ReplicaUnavailableException If replica with given replication 
group id doesn't exist or not started yet.
@@ -210,7 +225,14 @@ public class ReplicaService {
                             return null;
                         });
                     } else {
-                        res.completeExceptionally(errResp.throwable());
+                        if (retryExecutor != null && 
matchAny(unwrapCause(errResp.throwable()), ACQUIRE_LOCK_ERR, REPLICA_MISS_ERR)) 
{
+                            retryExecutor.schedule(
+                                    // Need to resubmit again to pool which is 
valid for synchronous IO execution.
+                                    () -> 
partitionOperationsExecutor.execute(() -> 
res.completeExceptionally(errResp.throwable())),
+                                    RETRY_TIMEOUT_MILLIS, MILLISECONDS);
+                        } else {
+                            res.completeExceptionally(errResp.throwable());
+                        }
                     }
                 } else {
                     res.complete((R) ((ReplicaResponse) response).result());
@@ -224,7 +246,7 @@ public class ReplicaService {
     /**
      * Sends a request to the given replica {@code node} and returns a future 
that will be completed with a result of request processing.
      *
-     * @param node    Replica node.
+     * @param node Replica node.
      * @param request Request.
      * @return Response future with either evaluation result or completed 
exceptionally.
      * @see NodeStoppingException If either supplier or demander node is 
stopping.
@@ -252,8 +274,8 @@ public class ReplicaService {
     /**
      * Sends a request to the given replica {@code node} and returns a future 
that will be completed with a result of request processing.
      *
-     * @param node      Replica node.
-     * @param request   Request.
+     * @param node Replica node.
+     * @param request Request.
      * @param storageId Storage id.
      * @return Response future with either evaluation result or completed 
exceptionally.
      * @see NodeStoppingException If either supplier or demander node is 
stopping.
diff --git 
a/modules/runner/src/integrationTest/java/org/apache/ignite/internal/runner/app/ItIgniteNodeRestartTest.java
 
b/modules/runner/src/integrationTest/java/org/apache/ignite/internal/runner/app/ItIgniteNodeRestartTest.java
index cdb511f090..0c0ead5bf1 100644
--- 
a/modules/runner/src/integrationTest/java/org/apache/ignite/internal/runner/app/ItIgniteNodeRestartTest.java
+++ 
b/modules/runner/src/integrationTest/java/org/apache/ignite/internal/runner/app/ItIgniteNodeRestartTest.java
@@ -386,7 +386,8 @@ public class ItIgniteNodeRestartTest extends 
BaseIgniteRestartTest {
                 messagingServiceReturningToStorageOperationsPool,
                 hybridClock,
                 threadPoolsManager.partitionOperationsExecutor(),
-                replicationConfiguration
+                replicationConfiguration,
+                threadPoolsManager.commonScheduler()
         );
 
         var lockManager = new HeapLockManager();
diff --git 
a/modules/runner/src/integrationTest/java/org/apache/ignite/internal/table/ItTableScanTest.java
 
b/modules/runner/src/integrationTest/java/org/apache/ignite/internal/table/ItTableScanTest.java
index d1b31ab596..9fd16e4dc5 100644
--- 
a/modules/runner/src/integrationTest/java/org/apache/ignite/internal/table/ItTableScanTest.java
+++ 
b/modules/runner/src/integrationTest/java/org/apache/ignite/internal/table/ItTableScanTest.java
@@ -135,7 +135,11 @@ public class ItTableScanTest extends 
BaseSqlIntegrationTest {
     private void checkResourcesAreReleased(IgniteImpl ignite) {
         checkCursorsAreClosed(ignite);
 
-        assertTrue(ignite.txManager().lockManager().isEmpty());
+        try {
+            assertTrue(waitForCondition(() -> 
ignite.txManager().lockManager().isEmpty(), 1000));
+        } catch (InterruptedException e) {
+            fail("Unexpected interruption");
+        }
     }
 
     /**
diff --git 
a/modules/runner/src/main/java/org/apache/ignite/internal/app/IgniteImpl.java 
b/modules/runner/src/main/java/org/apache/ignite/internal/app/IgniteImpl.java
index 8503e0f529..cf3d4ca1b0 100644
--- 
a/modules/runner/src/main/java/org/apache/ignite/internal/app/IgniteImpl.java
+++ 
b/modules/runner/src/main/java/org/apache/ignite/internal/app/IgniteImpl.java
@@ -610,7 +610,8 @@ public class IgniteImpl implements Ignite {
                 messagingServiceReturningToStorageOperationsPool,
                 clock,
                 threadPoolsManager.partitionOperationsExecutor(),
-                replicationConfig
+                replicationConfig,
+                threadPoolsManager.commonScheduler()
         );
 
         LongSupplier partitionIdleSafeTimePropagationPeriodMsSupplier = 
partitionIdleSafeTimePropagationPeriodMsSupplier(replicationConfig);
diff --git 
a/modules/table/src/integrationTest/java/org/apache/ignite/internal/rebalance/ItRebalanceDistributedTest.java
 
b/modules/table/src/integrationTest/java/org/apache/ignite/internal/rebalance/ItRebalanceDistributedTest.java
index 0c742a9801..e08d866cd0 100644
--- 
a/modules/table/src/integrationTest/java/org/apache/ignite/internal/rebalance/ItRebalanceDistributedTest.java
+++ 
b/modules/table/src/integrationTest/java/org/apache/ignite/internal/rebalance/ItRebalanceDistributedTest.java
@@ -1098,7 +1098,8 @@ public class ItRebalanceDistributedTest extends 
BaseIgniteAbstractTest {
                     clusterService.messagingService(),
                     hybridClock,
                     threadPoolsManager.partitionOperationsExecutor(),
-                    replicationConfiguration
+                    replicationConfiguration,
+                    threadPoolsManager.commonScheduler()
             );
 
             var resourcesRegistry = new RemotelyTriggeredResourceRegistry();
diff --git 
a/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/storage/InternalTableImpl.java
 
b/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/storage/InternalTableImpl.java
index 8ea498f868..9692a2f908 100644
--- 
a/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/storage/InternalTableImpl.java
+++ 
b/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/storage/InternalTableImpl.java
@@ -21,6 +21,7 @@ import static 
it.unimi.dsi.fastutil.ints.Int2ObjectMaps.emptyMap;
 import static java.util.concurrent.CompletableFuture.completedFuture;
 import static java.util.concurrent.CompletableFuture.failedFuture;
 import static java.util.concurrent.TimeUnit.SECONDS;
+import static java.util.function.Function.identity;
 import static org.apache.ignite.internal.lang.IgniteStringFormatter.format;
 import static 
org.apache.ignite.internal.table.distributed.replicator.action.RequestType.RW_DELETE_ALL;
 import static 
org.apache.ignite.internal.table.distributed.replicator.action.RequestType.RW_GET;
@@ -29,19 +30,20 @@ import static 
org.apache.ignite.internal.table.distributed.storage.RowBatch.allR
 import static 
org.apache.ignite.internal.util.CompletableFutures.completedOrFailedFuture;
 import static 
org.apache.ignite.internal.util.CompletableFutures.emptyListCompletedFuture;
 import static 
org.apache.ignite.internal.util.CompletableFutures.nullCompletedFuture;
+import static org.apache.ignite.internal.util.ExceptionUtils.matchAny;
+import static org.apache.ignite.internal.util.ExceptionUtils.sneakyThrow;
 import static org.apache.ignite.internal.util.ExceptionUtils.unwrapCause;
 import static org.apache.ignite.internal.util.ExceptionUtils.withCause;
 import static 
org.apache.ignite.internal.util.FastTimestamps.coarseCurrentTimeMillis;
 import static org.apache.ignite.lang.ErrorGroups.Common.INTERNAL_ERR;
+import static org.apache.ignite.lang.ErrorGroups.Replicator.REPLICA_MISS_ERR;
 import static 
org.apache.ignite.lang.ErrorGroups.Replicator.REPLICA_UNAVAILABLE_ERR;
 import static org.apache.ignite.lang.ErrorGroups.Transactions.ACQUIRE_LOCK_ERR;
 import static 
org.apache.ignite.lang.ErrorGroups.Transactions.TX_ALREADY_FINISHED_ERR;
 import static 
org.apache.ignite.lang.ErrorGroups.Transactions.TX_FAILED_READ_WRITE_OPERATION_ERR;
-import static 
org.apache.ignite.lang.ErrorGroups.Transactions.TX_REPLICA_UNAVAILABLE_ERR;
 
 import it.unimi.dsi.fastutil.ints.Int2ObjectMap;
 import it.unimi.dsi.fastutil.ints.Int2ObjectOpenHashMap;
-import java.net.ConnectException;
 import java.nio.ByteBuffer;
 import java.util.ArrayList;
 import java.util.BitSet;
@@ -52,12 +54,10 @@ import java.util.List;
 import java.util.Set;
 import java.util.UUID;
 import java.util.concurrent.CompletableFuture;
-import java.util.concurrent.CompletionException;
 import java.util.concurrent.Flow.Publisher;
 import java.util.concurrent.Flow.Subscriber;
 import java.util.concurrent.Flow.Subscription;
 import java.util.concurrent.ScheduledExecutorService;
-import java.util.concurrent.TimeoutException;
 import java.util.concurrent.atomic.AtomicBoolean;
 import java.util.concurrent.atomic.AtomicLong;
 import java.util.function.BiFunction;
@@ -75,8 +75,6 @@ import org.apache.ignite.internal.placementdriver.ReplicaMeta;
 import org.apache.ignite.internal.replicator.ReplicaService;
 import org.apache.ignite.internal.replicator.ReplicationGroupId;
 import org.apache.ignite.internal.replicator.TablePartitionId;
-import 
org.apache.ignite.internal.replicator.exception.PrimaryReplicaMissException;
-import org.apache.ignite.internal.replicator.exception.ReplicationException;
 import org.apache.ignite.internal.replicator.message.ReplicaRequest;
 import org.apache.ignite.internal.schema.BinaryRow;
 import org.apache.ignite.internal.schema.BinaryRowEx;
@@ -101,16 +99,13 @@ import 
org.apache.ignite.internal.table.distributed.replication.request.SwapRowR
 import 
org.apache.ignite.internal.table.distributed.replicator.action.RequestType;
 import org.apache.ignite.internal.tx.HybridTimestampTracker;
 import org.apache.ignite.internal.tx.InternalTransaction;
-import org.apache.ignite.internal.tx.LockException;
 import org.apache.ignite.internal.tx.TxManager;
 import org.apache.ignite.internal.tx.impl.TransactionInflights;
 import org.apache.ignite.internal.tx.storage.state.TxStateTableStorage;
 import org.apache.ignite.internal.util.CollectionUtils;
-import org.apache.ignite.internal.util.ExceptionUtils;
 import org.apache.ignite.internal.util.IgniteUtils;
 import org.apache.ignite.internal.util.PendingComparableValuesTracker;
 import org.apache.ignite.internal.utils.PrimaryReplica;
-import org.apache.ignite.lang.IgniteException;
 import org.apache.ignite.network.ClusterNode;
 import org.apache.ignite.network.ClusterNodeResolver;
 import org.apache.ignite.tx.TransactionException;
@@ -365,16 +360,16 @@ public class InternalTableImpl implements InternalTable {
                 if (implicit) {
                     long ts = (txStartTs == null) ? 
actualTx.startTimestamp().getPhysical() : txStartTs;
 
-                    if (exceptionAllowsTxRetry(e) && coarseCurrentTimeMillis() 
- ts < implicitTransactionTimeout) {
+                    if (exceptionAllowsImplicitTxRetry(e) && 
coarseCurrentTimeMillis() - ts < implicitTransactionTimeout) {
                         return enlistInTx(row, null, fac, noWriteChecker, ts);
                     }
                 }
 
-                throw wrapReplicationException(e);
+                sneakyThrow(e);
             }
 
             return completedFuture(r);
-        }).thenCompose(x -> x);
+        }).thenCompose(identity());
     }
 
     /**
@@ -484,16 +479,16 @@ public class InternalTableImpl implements InternalTable {
                 if (implicit) {
                     long ts = (txStartTs == null) ? 
actualTx.startTimestamp().getPhysical() : txStartTs;
 
-                    if (exceptionAllowsTxRetry(e) && coarseCurrentTimeMillis() 
- ts < implicitTransactionTimeout) {
+                    if (exceptionAllowsImplicitTxRetry(e) && 
coarseCurrentTimeMillis() - ts < implicitTransactionTimeout) {
                         return enlistInTx(keyRows, null, fac, reducer, 
noOpChecker, ts);
                     }
                 }
 
-                throw wrapReplicationException(e);
+                sneakyThrow(e);
             }
 
             return completedFuture(r);
-        }).thenCompose(x -> x);
+        }).thenCompose(identity());
     }
 
     private InternalTransaction startImplicitRwTxIfNeeded(@Nullable 
InternalTransaction tx) {
@@ -626,50 +621,71 @@ public class InternalTableImpl implements InternalTable {
                 || request instanceof MultipleRowPkReplicaRequest && 
((MultipleRowPkReplicaRequest) request).requestType() != RW_GET_ALL
                 || request instanceof SwapRowReplicaRequest;
 
-        if (write && !full) {
-            // Track only write requests from explicit transactions.
-            if (!transactionInflights.addInflight(tx.id(), false)) {
-                return failedFuture(
-                        new TransactionException(TX_ALREADY_FINISHED_ERR, 
format(
-                                "Transaction is already finished 
[tableName={}, partId={}, txState={}].",
-                                tableName,
-                                partId,
-                                tx.state()
-                        )));
-            }
+        if (full) { // Full transaction retries are handled in postEnlist.
+            return replicaSvc.invoke(primaryReplicaAndConsistencyToken.get1(), 
request);
+        } else {
+            if (write) { // Track only write requests from explicit 
transactions.
+                if (!transactionInflights.addInflight(tx.id(), false)) {
+                    return failedFuture(
+                            new TransactionException(TX_ALREADY_FINISHED_ERR, 
format(
+                                    "Transaction is already finished 
[tableName={}, partId={}, txState={}].",
+                                    tableName,
+                                    partId,
+                                    tx.state()
+                            )));
+                }
 
-            return 
replicaSvc.<R>invoke(primaryReplicaAndConsistencyToken.get1(), 
request).thenApply(res -> {
-                assert noWriteChecker != null;
+                return 
replicaSvc.<R>invoke(primaryReplicaAndConsistencyToken.get1(), 
request).thenApply(res -> {
+                    assert noWriteChecker != null;
 
-                // Remove inflight if no replication was scheduled, otherwise 
inflight will be removed by delayed response.
-                if (noWriteChecker.test(res, request)) {
-                    transactionInflights.removeInflight(tx.id());
-                }
+                    // Remove inflight if no replication was scheduled, 
otherwise inflight will be removed by delayed response.
+                    if (noWriteChecker.test(res, request)) {
+                        transactionInflights.removeInflight(tx.id());
+                    }
+
+                    return res;
+                }).handle((r, e) -> {
+                    if (e != null) {
+                        if (retryOnLockConflict > 0 && 
matchAny(unwrapCause(e), ACQUIRE_LOCK_ERR)) {
+                            transactionInflights.removeInflight(tx.id()); // 
Will be retried.
+
+                            return trackingInvoke(
+                                    tx,
+                                    partId,
+                                    ignored -> request,
+                                    false,
+                                    primaryReplicaAndConsistencyToken,
+                                    noWriteChecker,
+                                    retryOnLockConflict - 1
+                            );
+                        }
 
-                return res;
-            }).handle((r, e) -> {
-                if (e != null) {
-                    if (retryOnLockConflict > 0 && e.getCause() instanceof 
LockException) {
-                        transactionInflights.removeInflight(tx.id()); // Will 
be retried.
-
-                        return trackingInvoke(
-                                tx,
-                                partId,
-                                ignored -> request,
-                                full,
-                                primaryReplicaAndConsistencyToken,
-                                noWriteChecker,
-                                retryOnLockConflict - 1
-                        );
+                        sneakyThrow(e);
                     }
 
-                    ExceptionUtils.sneakyThrow(e);
-                }
+                    return completedFuture(r);
+                }).thenCompose(identity());
+            } else { // Explicit reads should be retried too.
+                return 
replicaSvc.<R>invoke(primaryReplicaAndConsistencyToken.get1(), 
request).handle((r, e) -> {
+                    if (e != null) {
+                        if (retryOnLockConflict > 0 && 
matchAny(unwrapCause(e), ACQUIRE_LOCK_ERR)) {
+                            return trackingInvoke(
+                                    tx,
+                                    partId,
+                                    ignored -> request,
+                                    false,
+                                    primaryReplicaAndConsistencyToken,
+                                    noWriteChecker,
+                                    retryOnLockConflict - 1
+                            );
+                        }
 
-                return completedFuture(r);
-            }).thenCompose(x -> x);
-        } else {
-            return replicaSvc.invoke(primaryReplicaAndConsistencyToken.get1(), 
request);
+                        sneakyThrow(e);
+                    }
+
+                    return completedFuture(r);
+                }).thenCompose(identity());
+            }
         }
     }
 
@@ -690,30 +706,25 @@ public class InternalTableImpl implements InternalTable {
             if (full) { // Full txn is already finished remotely. Just update 
local state.
                 txManager.finishFull(observableTimestampTracker, tx0.id(), e 
== null);
 
-                return e != null ? failedFuture(wrapReplicationException(e)) : 
completedFuture(r);
+                return e != null ? failedFuture(e) : completedFuture(r);
             }
 
             if (e != null) {
-                RuntimeException e0 = wrapReplicationException(e);
-
                 return tx0.rollbackAsync().handle((ignored, err) -> {
                     if (err != null) {
-                        e0.addSuppressed(err);
+                        e.addSuppressed(err);
                     }
-                    throw e0;
+                    sneakyThrow(e);
+                    return null;
                 }); // Preserve failed state.
             } else {
                 if (autoCommit) {
-                    return tx0.commitAsync()
-                            .exceptionally(ex -> {
-                                throw wrapReplicationException(ex);
-                            })
-                            .thenApply(ignored -> r);
+                    return tx0.commitAsync().thenApply(ignored -> r);
                 } else {
                     return completedFuture(r);
                 }
             }
-        }).thenCompose(x -> x);
+        }).thenCompose(identity());
     }
 
     /**
@@ -819,29 +830,24 @@ public class InternalTableImpl implements InternalTable {
     private <R> CompletableFuture<R> postEvaluate(CompletableFuture<R> fut, 
InternalTransaction tx) {
         return fut.handle((BiFunction<R, Throwable, CompletableFuture<R>>) (r, 
e) -> {
             if (e != null) {
-                RuntimeException e0 = wrapReplicationException(e);
-
                 return tx.finish(false, clock.now())
                         .handle((ignored, err) -> {
-
                             if (err != null) {
-                                e0.addSuppressed(err);
+                                e.addSuppressed(err);
                             }
-                            throw e0;
+
+                            sneakyThrow(e);
+                            return null;
                         }); // Preserve failed state.
             }
 
-            return tx.finish(true, clock.now())
-                    .exceptionally(ex -> {
-                        throw wrapReplicationException(ex);
-                    })
-                    .thenApply(ignored -> r);
-        }).thenCompose(x -> x);
+            return tx.finish(true, clock.now()).thenApply(ignored -> r);
+        }).thenCompose(identity());
     }
 
     /** {@inheritDoc} */
     @Override
-    public CompletableFuture<BinaryRow> get(BinaryRowEx keyRow, 
InternalTransaction tx) {
+    public CompletableFuture<BinaryRow> get(BinaryRowEx keyRow, @Nullable 
InternalTransaction tx) {
         if (tx == null) {
             return evaluateReadOnlyPrimaryNode(
                     keyRow,
@@ -872,7 +878,7 @@ public class InternalTableImpl implements InternalTable {
                         .enlistmentConsistencyToken(enlistmentConsistencyToken)
                         .requestType(RW_GET)
                         .timestampLong(clock.nowLong())
-                        .full(tx == null)
+                        .full(false)
                         .coordinatorId(txo.coordinatorId())
                         .build(),
                 (res, req) -> false
@@ -1012,6 +1018,10 @@ public class InternalTableImpl implements InternalTable {
         boolean first = true;
 
         for (BinaryRow row : rows) {
+            if (row == null) {
+                continue;
+            }
+
             if (first) {
                 schemaVersion = row.schemaVersion();
                 first = false;
@@ -1056,7 +1066,7 @@ public class InternalTableImpl implements InternalTable {
 
     /** {@inheritDoc} */
     @Override
-    public CompletableFuture<Void> upsert(BinaryRowEx row, InternalTransaction 
tx) {
+    public CompletableFuture<Void> upsert(BinaryRowEx row, @Nullable 
InternalTransaction tx) {
         return enlistInTx(
                 row,
                 tx,
@@ -1078,7 +1088,7 @@ public class InternalTableImpl implements InternalTable {
 
     /** {@inheritDoc} */
     @Override
-    public CompletableFuture<Void> upsertAll(Collection<BinaryRowEx> rows, 
InternalTransaction tx) {
+    public CompletableFuture<Void> upsertAll(Collection<BinaryRowEx> rows, 
@Nullable InternalTransaction tx) {
         return enlistInTx(
                 rows,
                 tx,
@@ -1091,9 +1101,29 @@ public class InternalTableImpl implements InternalTable {
     /** {@inheritDoc} */
     @Override
     public CompletableFuture<Void> updateAll(Collection<BinaryRowEx> rows, 
@Nullable BitSet deleted, int partition) {
+        return updateAllWithRetry(rows, deleted, partition, null);
+    }
+
+    /**
+     * Update all with retry.
+     *
+     * @param rows Rows.
+     * @param deleted Deleted.
+     * @param partition The partition.
+     * @param txStartTs Start timestamp.
+     * @return The future.
+     */
+    private CompletableFuture<Void> updateAllWithRetry(
+            Collection<BinaryRowEx> rows,
+            @Nullable BitSet deleted,
+            int partition,
+            @Nullable Long txStartTs
+    ) {
         InternalTransaction tx = txManager.begin(observableTimestampTracker);
         TablePartitionId partGroupId = new TablePartitionId(tableId, 
partition);
 
+        assert rows.stream().allMatch(row -> partitionId(row) == partition) : 
"Invalid batch for partition " + partition;
+
         CompletableFuture<Void> fut = enlistAndInvoke(
                 tx,
                 partition,
@@ -1102,7 +1132,20 @@ public class InternalTableImpl implements InternalTable {
                 null
         );
 
-        return postEnlist(fut, false, tx, true); // Will be committed in one 
RTT.
+        // Will be finished in one RTT.
+        return postEnlist(fut, false, tx, true).handle((r, e) -> {
+            if (e != null) {
+                long ts = (txStartTs == null) ? 
tx.startTimestamp().getPhysical() : txStartTs;
+
+                if (exceptionAllowsImplicitTxRetry(e) && 
coarseCurrentTimeMillis() - ts < implicitTransactionTimeout) {
+                    return updateAllWithRetry(rows, deleted, partition, ts);
+                }
+
+                sneakyThrow(e);
+            }
+
+            return completedFuture(r);
+        }).thenCompose(identity());
     }
 
     /** {@inheritDoc} */
@@ -1702,7 +1745,7 @@ public class InternalTableImpl implements InternalTable {
             }
 
             return fut;
-        }).thenCompose(Function.identity());
+        }).thenCompose(identity());
     }
 
     /**
@@ -2159,34 +2202,6 @@ public class InternalTableImpl implements InternalTable {
                 });
     }
 
-    /**
-     * Casts any exception type to a client exception, wherein {@link 
ReplicationException} and {@link LockException} are wrapped to
-     * {@link TransactionException}, but another exceptions are wrapped to a 
common exception. The method does not wrap an exception if the
-     * exception already inherits type of {@link RuntimeException}.
-     *
-     * @param e An instance exception to cast to client side one.
-     * @return {@link IgniteException} An instance of client side exception.
-     */
-    private RuntimeException wrapReplicationException(Throwable e) {
-        if (e instanceof CompletionException) {
-            e = e.getCause();
-        }
-
-        RuntimeException e0;
-
-        if (e instanceof ReplicationException || e instanceof ConnectException 
|| e instanceof TimeoutException) {
-            e0 = withCause(TransactionException::new, 
TX_REPLICA_UNAVAILABLE_ERR, e);
-        } else if (e instanceof LockException) {
-            e0 = withCause(TransactionException::new, ACQUIRE_LOCK_ERR, e);
-        } else if (!(e instanceof RuntimeException)) {
-            e0 = withCause(IgniteException::new, INTERNAL_ERR, e);
-        } else {
-            e0 = (RuntimeException) e;
-        }
-
-        return e0;
-    }
-
     @Override
     public @Nullable PendingComparableValuesTracker<HybridTimestamp, Void> 
getPartitionSafeTimeTracker(int partitionId) {
         return safeTimeTrackerByPartitionId.get(partitionId);
@@ -2273,13 +2288,7 @@ public class InternalTableImpl implements InternalTable {
      * @param e Exception to check.
      * @return True if retrying is possible, false otherwise.
      */
-    private static boolean exceptionAllowsTxRetry(Throwable e) {
-        Throwable ex = unwrapCause(e);
-
-        while (ex instanceof TransactionException && ex.getCause() != null) {
-            ex = ex.getCause();
-        }
-
-        return ex instanceof LockException || ex instanceof 
PrimaryReplicaMissException;
+    private static boolean exceptionAllowsImplicitTxRetry(Throwable e) {
+        return matchAny(unwrapCause(e), ACQUIRE_LOCK_ERR, REPLICA_MISS_ERR);
     }
 }
diff --git 
a/modules/table/src/testFixtures/java/org/apache/ignite/distributed/ItTxTestCluster.java
 
b/modules/table/src/testFixtures/java/org/apache/ignite/distributed/ItTxTestCluster.java
index a7b3fe44ab..530dcbca3f 100644
--- 
a/modules/table/src/testFixtures/java/org/apache/ignite/distributed/ItTxTestCluster.java
+++ 
b/modules/table/src/testFixtures/java/org/apache/ignite/distributed/ItTxTestCluster.java
@@ -247,7 +247,6 @@ public class ItTxTestCluster {
     protected String localNodeName;
 
     private final ClusterNodeResolver nodeResolver = new ClusterNodeResolver() 
{
-
         @Override
         public @Nullable ClusterNode getById(String id) {
             for (ClusterService service : cluster) {
@@ -425,7 +424,8 @@ public class ItTxTestCluster {
                     clusterService.messagingService(),
                     clock,
                     partitionOperationsExecutor,
-                    replicationConfiguration
+                    replicationConfiguration,
+                    executor
             ));
 
             replicaServices.put(node.name(), replicaSvc);
@@ -981,7 +981,8 @@ public class ItTxTestCluster {
                 client.messagingService(),
                 clientClock,
                 partitionOperationsExecutor,
-                replicationConfiguration
+                replicationConfiguration,
+                executor
         ));
 
         LOG.info("The client has been started");
diff --git 
a/modules/transactions/src/main/java/org/apache/ignite/internal/tx/TransactionExceptionMapperProvider.java
 
b/modules/transactions/src/main/java/org/apache/ignite/internal/tx/TransactionExceptionMapperProvider.java
new file mode 100644
index 0000000000..7c0dc5c40c
--- /dev/null
+++ 
b/modules/transactions/src/main/java/org/apache/ignite/internal/tx/TransactionExceptionMapperProvider.java
@@ -0,0 +1,46 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.tx;
+
+import static org.apache.ignite.internal.lang.IgniteExceptionMapper.unchecked;
+
+import com.google.auto.service.AutoService;
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.List;
+import org.apache.ignite.internal.lang.IgniteExceptionMapper;
+import org.apache.ignite.internal.lang.IgniteExceptionMappersProvider;
+import org.apache.ignite.internal.replicator.exception.ReplicationException;
+import org.apache.ignite.tx.TransactionException;
+
+/**
+ * Transaction module exception mapper.
+ */
+@AutoService(IgniteExceptionMappersProvider.class)
+public class TransactionExceptionMapperProvider implements 
IgniteExceptionMappersProvider {
+    @Override
+    public Collection<IgniteExceptionMapper<?, ?>> mappers() {
+        List<IgniteExceptionMapper<?, ?>> mappers = new ArrayList<>();
+
+        mappers.add(unchecked(LockException.class, err -> new 
TransactionException(err.traceId(), err.code(), err.getMessage(), err)));
+        mappers.add(unchecked(ReplicationException.class,
+                err -> new TransactionException(err.traceId(), err.code(), 
err.getMessage(), err)));
+
+        return mappers;
+    }
+}


Reply via email to