This is an automated email from the ASF dual-hosted git repository. ascherbakov pushed a commit to branch main in repository https://gitbox.apache.org/repos/asf/ignite-3.git
The following commit(s) were added to refs/heads/main by this push: new c25c060763 IGNITE-22130 Fix retries logic. c25c060763 is described below commit c25c060763a17dcf2a746d6c5d7bd4c150c391eb Author: Alexey Scherbakov <alexey.scherbak...@gmail.com> AuthorDate: Mon May 13 16:50:28 2024 +0300 IGNITE-22130 Fix retries logic. --- .../ignite/internal/util/ExceptionUtils.java | 24 ++ .../ignite/internal/replicator/ReplicaService.java | 36 ++- .../runner/app/ItIgniteNodeRestartTest.java | 3 +- .../ignite/internal/table/ItTableScanTest.java | 6 +- .../org/apache/ignite/internal/app/IgniteImpl.java | 3 +- .../rebalance/ItRebalanceDistributedTest.java | 3 +- .../distributed/storage/InternalTableImpl.java | 243 +++++++++++---------- .../apache/ignite/distributed/ItTxTestCluster.java | 7 +- .../tx/TransactionExceptionMapperProvider.java | 46 ++++ 9 files changed, 240 insertions(+), 131 deletions(-) diff --git a/modules/core/src/main/java/org/apache/ignite/internal/util/ExceptionUtils.java b/modules/core/src/main/java/org/apache/ignite/internal/util/ExceptionUtils.java index dbad400afa..63a365689c 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/util/ExceptionUtils.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/util/ExceptionUtils.java @@ -567,6 +567,30 @@ public final class ExceptionUtils { return INTERNAL_ERR; } + /** + * Determine if a particular error matches any of passed error codes. + * + * @param t Unwrapped throwable. + * @param code The code. + * @param codes Other codes. + * @return {@code True} if exception allows retry. + */ + public static boolean matchAny(Throwable t, int code, int... codes) { + int errCode = extractCodeFrom(t); + + if (code == errCode) { + return true; + } + + for (int c0 : codes) { + if (c0 == errCode) { + return true; + } + } + + return false; + } + // TODO: https://issues.apache.org/jira/browse/IGNITE-19870 // This method should be removed or re-worked and usages should be changed to IgniteExceptionMapperUtil.mapToPublicException. /** diff --git a/modules/replicator/src/main/java/org/apache/ignite/internal/replicator/ReplicaService.java b/modules/replicator/src/main/java/org/apache/ignite/internal/replicator/ReplicaService.java index 245b3b4bdb..a174eae3b1 100644 --- a/modules/replicator/src/main/java/org/apache/ignite/internal/replicator/ReplicaService.java +++ b/modules/replicator/src/main/java/org/apache/ignite/internal/replicator/ReplicaService.java @@ -17,17 +17,22 @@ package org.apache.ignite.internal.replicator; +import static java.util.concurrent.TimeUnit.MILLISECONDS; import static org.apache.ignite.internal.lang.IgniteStringFormatter.format; +import static org.apache.ignite.internal.util.ExceptionUtils.matchAny; import static org.apache.ignite.internal.util.ExceptionUtils.unwrapCause; import static org.apache.ignite.internal.util.ExceptionUtils.withCause; import static org.apache.ignite.lang.ErrorGroups.Replicator.REPLICA_COMMON_ERR; +import static org.apache.ignite.lang.ErrorGroups.Replicator.REPLICA_MISS_ERR; import static org.apache.ignite.lang.ErrorGroups.Replicator.REPLICA_TIMEOUT_ERR; +import static org.apache.ignite.lang.ErrorGroups.Transactions.ACQUIRE_LOCK_ERR; import java.util.Map; import java.util.concurrent.CompletableFuture; import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.Executor; import java.util.concurrent.ForkJoinPool; +import java.util.concurrent.ScheduledExecutorService; import java.util.concurrent.TimeoutException; import org.apache.ignite.internal.hlc.HybridClock; import org.apache.ignite.internal.lang.NodeStoppingException; @@ -45,10 +50,14 @@ import org.apache.ignite.internal.replicator.message.ReplicaRequest; import org.apache.ignite.internal.replicator.message.ReplicaResponse; import org.apache.ignite.internal.replicator.message.TimestampAware; import org.apache.ignite.network.ClusterNode; +import org.jetbrains.annotations.Nullable; import org.jetbrains.annotations.TestOnly; /** The service is intended to execute requests on replicas. */ public class ReplicaService { + /** Retry timeout. */ + private static final int RETRY_TIMEOUT_MILLIS = 10; + /** Message service. */ private final MessagingService messagingService; @@ -59,6 +68,8 @@ public class ReplicaService { private final ReplicationConfiguration replicationConfiguration; + private @Nullable final ScheduledExecutorService retryExecutor; + /** Requests to retry. */ private final Map<String, CompletableFuture<NetworkMessage>> pendingInvokes = new ConcurrentHashMap<>(); @@ -82,7 +93,8 @@ public class ReplicaService { messagingService, clock, ForkJoinPool.commonPool(), - replicationConfiguration + replicationConfiguration, + null ); } @@ -93,24 +105,27 @@ public class ReplicaService { * @param clock A hybrid logical clock. * @param partitionOperationsExecutor Partition operation executor. * @param replicationConfiguration Replication configuration. + * @param retryExecutor Retry executor. */ public ReplicaService( MessagingService messagingService, HybridClock clock, Executor partitionOperationsExecutor, - ReplicationConfiguration replicationConfiguration + ReplicationConfiguration replicationConfiguration, + @Nullable ScheduledExecutorService retryExecutor ) { this.messagingService = messagingService; this.clock = clock; this.partitionOperationsExecutor = partitionOperationsExecutor; this.replicationConfiguration = replicationConfiguration; + this.retryExecutor = retryExecutor; } /** * Sends request to the replica node. * * @param targetNodeConsistentId A consistent id of the replica node.. - * @param req Replica request. + * @param req Replica request. * @return Response future with either evaluation result or completed exceptionally. * @see NodeStoppingException If either supplier or demander node is stopping. * @see ReplicaUnavailableException If replica with given replication group id doesn't exist or not started yet. @@ -210,7 +225,14 @@ public class ReplicaService { return null; }); } else { - res.completeExceptionally(errResp.throwable()); + if (retryExecutor != null && matchAny(unwrapCause(errResp.throwable()), ACQUIRE_LOCK_ERR, REPLICA_MISS_ERR)) { + retryExecutor.schedule( + // Need to resubmit again to pool which is valid for synchronous IO execution. + () -> partitionOperationsExecutor.execute(() -> res.completeExceptionally(errResp.throwable())), + RETRY_TIMEOUT_MILLIS, MILLISECONDS); + } else { + res.completeExceptionally(errResp.throwable()); + } } } else { res.complete((R) ((ReplicaResponse) response).result()); @@ -224,7 +246,7 @@ public class ReplicaService { /** * Sends a request to the given replica {@code node} and returns a future that will be completed with a result of request processing. * - * @param node Replica node. + * @param node Replica node. * @param request Request. * @return Response future with either evaluation result or completed exceptionally. * @see NodeStoppingException If either supplier or demander node is stopping. @@ -252,8 +274,8 @@ public class ReplicaService { /** * Sends a request to the given replica {@code node} and returns a future that will be completed with a result of request processing. * - * @param node Replica node. - * @param request Request. + * @param node Replica node. + * @param request Request. * @param storageId Storage id. * @return Response future with either evaluation result or completed exceptionally. * @see NodeStoppingException If either supplier or demander node is stopping. diff --git a/modules/runner/src/integrationTest/java/org/apache/ignite/internal/runner/app/ItIgniteNodeRestartTest.java b/modules/runner/src/integrationTest/java/org/apache/ignite/internal/runner/app/ItIgniteNodeRestartTest.java index cdb511f090..0c0ead5bf1 100644 --- a/modules/runner/src/integrationTest/java/org/apache/ignite/internal/runner/app/ItIgniteNodeRestartTest.java +++ b/modules/runner/src/integrationTest/java/org/apache/ignite/internal/runner/app/ItIgniteNodeRestartTest.java @@ -386,7 +386,8 @@ public class ItIgniteNodeRestartTest extends BaseIgniteRestartTest { messagingServiceReturningToStorageOperationsPool, hybridClock, threadPoolsManager.partitionOperationsExecutor(), - replicationConfiguration + replicationConfiguration, + threadPoolsManager.commonScheduler() ); var lockManager = new HeapLockManager(); diff --git a/modules/runner/src/integrationTest/java/org/apache/ignite/internal/table/ItTableScanTest.java b/modules/runner/src/integrationTest/java/org/apache/ignite/internal/table/ItTableScanTest.java index d1b31ab596..9fd16e4dc5 100644 --- a/modules/runner/src/integrationTest/java/org/apache/ignite/internal/table/ItTableScanTest.java +++ b/modules/runner/src/integrationTest/java/org/apache/ignite/internal/table/ItTableScanTest.java @@ -135,7 +135,11 @@ public class ItTableScanTest extends BaseSqlIntegrationTest { private void checkResourcesAreReleased(IgniteImpl ignite) { checkCursorsAreClosed(ignite); - assertTrue(ignite.txManager().lockManager().isEmpty()); + try { + assertTrue(waitForCondition(() -> ignite.txManager().lockManager().isEmpty(), 1000)); + } catch (InterruptedException e) { + fail("Unexpected interruption"); + } } /** diff --git a/modules/runner/src/main/java/org/apache/ignite/internal/app/IgniteImpl.java b/modules/runner/src/main/java/org/apache/ignite/internal/app/IgniteImpl.java index 8503e0f529..cf3d4ca1b0 100644 --- a/modules/runner/src/main/java/org/apache/ignite/internal/app/IgniteImpl.java +++ b/modules/runner/src/main/java/org/apache/ignite/internal/app/IgniteImpl.java @@ -610,7 +610,8 @@ public class IgniteImpl implements Ignite { messagingServiceReturningToStorageOperationsPool, clock, threadPoolsManager.partitionOperationsExecutor(), - replicationConfig + replicationConfig, + threadPoolsManager.commonScheduler() ); LongSupplier partitionIdleSafeTimePropagationPeriodMsSupplier = partitionIdleSafeTimePropagationPeriodMsSupplier(replicationConfig); diff --git a/modules/table/src/integrationTest/java/org/apache/ignite/internal/rebalance/ItRebalanceDistributedTest.java b/modules/table/src/integrationTest/java/org/apache/ignite/internal/rebalance/ItRebalanceDistributedTest.java index 0c742a9801..e08d866cd0 100644 --- a/modules/table/src/integrationTest/java/org/apache/ignite/internal/rebalance/ItRebalanceDistributedTest.java +++ b/modules/table/src/integrationTest/java/org/apache/ignite/internal/rebalance/ItRebalanceDistributedTest.java @@ -1098,7 +1098,8 @@ public class ItRebalanceDistributedTest extends BaseIgniteAbstractTest { clusterService.messagingService(), hybridClock, threadPoolsManager.partitionOperationsExecutor(), - replicationConfiguration + replicationConfiguration, + threadPoolsManager.commonScheduler() ); var resourcesRegistry = new RemotelyTriggeredResourceRegistry(); diff --git a/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/storage/InternalTableImpl.java b/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/storage/InternalTableImpl.java index 8ea498f868..9692a2f908 100644 --- a/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/storage/InternalTableImpl.java +++ b/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/storage/InternalTableImpl.java @@ -21,6 +21,7 @@ import static it.unimi.dsi.fastutil.ints.Int2ObjectMaps.emptyMap; import static java.util.concurrent.CompletableFuture.completedFuture; import static java.util.concurrent.CompletableFuture.failedFuture; import static java.util.concurrent.TimeUnit.SECONDS; +import static java.util.function.Function.identity; import static org.apache.ignite.internal.lang.IgniteStringFormatter.format; import static org.apache.ignite.internal.table.distributed.replicator.action.RequestType.RW_DELETE_ALL; import static org.apache.ignite.internal.table.distributed.replicator.action.RequestType.RW_GET; @@ -29,19 +30,20 @@ import static org.apache.ignite.internal.table.distributed.storage.RowBatch.allR import static org.apache.ignite.internal.util.CompletableFutures.completedOrFailedFuture; import static org.apache.ignite.internal.util.CompletableFutures.emptyListCompletedFuture; import static org.apache.ignite.internal.util.CompletableFutures.nullCompletedFuture; +import static org.apache.ignite.internal.util.ExceptionUtils.matchAny; +import static org.apache.ignite.internal.util.ExceptionUtils.sneakyThrow; import static org.apache.ignite.internal.util.ExceptionUtils.unwrapCause; import static org.apache.ignite.internal.util.ExceptionUtils.withCause; import static org.apache.ignite.internal.util.FastTimestamps.coarseCurrentTimeMillis; import static org.apache.ignite.lang.ErrorGroups.Common.INTERNAL_ERR; +import static org.apache.ignite.lang.ErrorGroups.Replicator.REPLICA_MISS_ERR; import static org.apache.ignite.lang.ErrorGroups.Replicator.REPLICA_UNAVAILABLE_ERR; import static org.apache.ignite.lang.ErrorGroups.Transactions.ACQUIRE_LOCK_ERR; import static org.apache.ignite.lang.ErrorGroups.Transactions.TX_ALREADY_FINISHED_ERR; import static org.apache.ignite.lang.ErrorGroups.Transactions.TX_FAILED_READ_WRITE_OPERATION_ERR; -import static org.apache.ignite.lang.ErrorGroups.Transactions.TX_REPLICA_UNAVAILABLE_ERR; import it.unimi.dsi.fastutil.ints.Int2ObjectMap; import it.unimi.dsi.fastutil.ints.Int2ObjectOpenHashMap; -import java.net.ConnectException; import java.nio.ByteBuffer; import java.util.ArrayList; import java.util.BitSet; @@ -52,12 +54,10 @@ import java.util.List; import java.util.Set; import java.util.UUID; import java.util.concurrent.CompletableFuture; -import java.util.concurrent.CompletionException; import java.util.concurrent.Flow.Publisher; import java.util.concurrent.Flow.Subscriber; import java.util.concurrent.Flow.Subscription; import java.util.concurrent.ScheduledExecutorService; -import java.util.concurrent.TimeoutException; import java.util.concurrent.atomic.AtomicBoolean; import java.util.concurrent.atomic.AtomicLong; import java.util.function.BiFunction; @@ -75,8 +75,6 @@ import org.apache.ignite.internal.placementdriver.ReplicaMeta; import org.apache.ignite.internal.replicator.ReplicaService; import org.apache.ignite.internal.replicator.ReplicationGroupId; import org.apache.ignite.internal.replicator.TablePartitionId; -import org.apache.ignite.internal.replicator.exception.PrimaryReplicaMissException; -import org.apache.ignite.internal.replicator.exception.ReplicationException; import org.apache.ignite.internal.replicator.message.ReplicaRequest; import org.apache.ignite.internal.schema.BinaryRow; import org.apache.ignite.internal.schema.BinaryRowEx; @@ -101,16 +99,13 @@ import org.apache.ignite.internal.table.distributed.replication.request.SwapRowR import org.apache.ignite.internal.table.distributed.replicator.action.RequestType; import org.apache.ignite.internal.tx.HybridTimestampTracker; import org.apache.ignite.internal.tx.InternalTransaction; -import org.apache.ignite.internal.tx.LockException; import org.apache.ignite.internal.tx.TxManager; import org.apache.ignite.internal.tx.impl.TransactionInflights; import org.apache.ignite.internal.tx.storage.state.TxStateTableStorage; import org.apache.ignite.internal.util.CollectionUtils; -import org.apache.ignite.internal.util.ExceptionUtils; import org.apache.ignite.internal.util.IgniteUtils; import org.apache.ignite.internal.util.PendingComparableValuesTracker; import org.apache.ignite.internal.utils.PrimaryReplica; -import org.apache.ignite.lang.IgniteException; import org.apache.ignite.network.ClusterNode; import org.apache.ignite.network.ClusterNodeResolver; import org.apache.ignite.tx.TransactionException; @@ -365,16 +360,16 @@ public class InternalTableImpl implements InternalTable { if (implicit) { long ts = (txStartTs == null) ? actualTx.startTimestamp().getPhysical() : txStartTs; - if (exceptionAllowsTxRetry(e) && coarseCurrentTimeMillis() - ts < implicitTransactionTimeout) { + if (exceptionAllowsImplicitTxRetry(e) && coarseCurrentTimeMillis() - ts < implicitTransactionTimeout) { return enlistInTx(row, null, fac, noWriteChecker, ts); } } - throw wrapReplicationException(e); + sneakyThrow(e); } return completedFuture(r); - }).thenCompose(x -> x); + }).thenCompose(identity()); } /** @@ -484,16 +479,16 @@ public class InternalTableImpl implements InternalTable { if (implicit) { long ts = (txStartTs == null) ? actualTx.startTimestamp().getPhysical() : txStartTs; - if (exceptionAllowsTxRetry(e) && coarseCurrentTimeMillis() - ts < implicitTransactionTimeout) { + if (exceptionAllowsImplicitTxRetry(e) && coarseCurrentTimeMillis() - ts < implicitTransactionTimeout) { return enlistInTx(keyRows, null, fac, reducer, noOpChecker, ts); } } - throw wrapReplicationException(e); + sneakyThrow(e); } return completedFuture(r); - }).thenCompose(x -> x); + }).thenCompose(identity()); } private InternalTransaction startImplicitRwTxIfNeeded(@Nullable InternalTransaction tx) { @@ -626,50 +621,71 @@ public class InternalTableImpl implements InternalTable { || request instanceof MultipleRowPkReplicaRequest && ((MultipleRowPkReplicaRequest) request).requestType() != RW_GET_ALL || request instanceof SwapRowReplicaRequest; - if (write && !full) { - // Track only write requests from explicit transactions. - if (!transactionInflights.addInflight(tx.id(), false)) { - return failedFuture( - new TransactionException(TX_ALREADY_FINISHED_ERR, format( - "Transaction is already finished [tableName={}, partId={}, txState={}].", - tableName, - partId, - tx.state() - ))); - } + if (full) { // Full transaction retries are handled in postEnlist. + return replicaSvc.invoke(primaryReplicaAndConsistencyToken.get1(), request); + } else { + if (write) { // Track only write requests from explicit transactions. + if (!transactionInflights.addInflight(tx.id(), false)) { + return failedFuture( + new TransactionException(TX_ALREADY_FINISHED_ERR, format( + "Transaction is already finished [tableName={}, partId={}, txState={}].", + tableName, + partId, + tx.state() + ))); + } - return replicaSvc.<R>invoke(primaryReplicaAndConsistencyToken.get1(), request).thenApply(res -> { - assert noWriteChecker != null; + return replicaSvc.<R>invoke(primaryReplicaAndConsistencyToken.get1(), request).thenApply(res -> { + assert noWriteChecker != null; - // Remove inflight if no replication was scheduled, otherwise inflight will be removed by delayed response. - if (noWriteChecker.test(res, request)) { - transactionInflights.removeInflight(tx.id()); - } + // Remove inflight if no replication was scheduled, otherwise inflight will be removed by delayed response. + if (noWriteChecker.test(res, request)) { + transactionInflights.removeInflight(tx.id()); + } + + return res; + }).handle((r, e) -> { + if (e != null) { + if (retryOnLockConflict > 0 && matchAny(unwrapCause(e), ACQUIRE_LOCK_ERR)) { + transactionInflights.removeInflight(tx.id()); // Will be retried. + + return trackingInvoke( + tx, + partId, + ignored -> request, + false, + primaryReplicaAndConsistencyToken, + noWriteChecker, + retryOnLockConflict - 1 + ); + } - return res; - }).handle((r, e) -> { - if (e != null) { - if (retryOnLockConflict > 0 && e.getCause() instanceof LockException) { - transactionInflights.removeInflight(tx.id()); // Will be retried. - - return trackingInvoke( - tx, - partId, - ignored -> request, - full, - primaryReplicaAndConsistencyToken, - noWriteChecker, - retryOnLockConflict - 1 - ); + sneakyThrow(e); } - ExceptionUtils.sneakyThrow(e); - } + return completedFuture(r); + }).thenCompose(identity()); + } else { // Explicit reads should be retried too. + return replicaSvc.<R>invoke(primaryReplicaAndConsistencyToken.get1(), request).handle((r, e) -> { + if (e != null) { + if (retryOnLockConflict > 0 && matchAny(unwrapCause(e), ACQUIRE_LOCK_ERR)) { + return trackingInvoke( + tx, + partId, + ignored -> request, + false, + primaryReplicaAndConsistencyToken, + noWriteChecker, + retryOnLockConflict - 1 + ); + } - return completedFuture(r); - }).thenCompose(x -> x); - } else { - return replicaSvc.invoke(primaryReplicaAndConsistencyToken.get1(), request); + sneakyThrow(e); + } + + return completedFuture(r); + }).thenCompose(identity()); + } } } @@ -690,30 +706,25 @@ public class InternalTableImpl implements InternalTable { if (full) { // Full txn is already finished remotely. Just update local state. txManager.finishFull(observableTimestampTracker, tx0.id(), e == null); - return e != null ? failedFuture(wrapReplicationException(e)) : completedFuture(r); + return e != null ? failedFuture(e) : completedFuture(r); } if (e != null) { - RuntimeException e0 = wrapReplicationException(e); - return tx0.rollbackAsync().handle((ignored, err) -> { if (err != null) { - e0.addSuppressed(err); + e.addSuppressed(err); } - throw e0; + sneakyThrow(e); + return null; }); // Preserve failed state. } else { if (autoCommit) { - return tx0.commitAsync() - .exceptionally(ex -> { - throw wrapReplicationException(ex); - }) - .thenApply(ignored -> r); + return tx0.commitAsync().thenApply(ignored -> r); } else { return completedFuture(r); } } - }).thenCompose(x -> x); + }).thenCompose(identity()); } /** @@ -819,29 +830,24 @@ public class InternalTableImpl implements InternalTable { private <R> CompletableFuture<R> postEvaluate(CompletableFuture<R> fut, InternalTransaction tx) { return fut.handle((BiFunction<R, Throwable, CompletableFuture<R>>) (r, e) -> { if (e != null) { - RuntimeException e0 = wrapReplicationException(e); - return tx.finish(false, clock.now()) .handle((ignored, err) -> { - if (err != null) { - e0.addSuppressed(err); + e.addSuppressed(err); } - throw e0; + + sneakyThrow(e); + return null; }); // Preserve failed state. } - return tx.finish(true, clock.now()) - .exceptionally(ex -> { - throw wrapReplicationException(ex); - }) - .thenApply(ignored -> r); - }).thenCompose(x -> x); + return tx.finish(true, clock.now()).thenApply(ignored -> r); + }).thenCompose(identity()); } /** {@inheritDoc} */ @Override - public CompletableFuture<BinaryRow> get(BinaryRowEx keyRow, InternalTransaction tx) { + public CompletableFuture<BinaryRow> get(BinaryRowEx keyRow, @Nullable InternalTransaction tx) { if (tx == null) { return evaluateReadOnlyPrimaryNode( keyRow, @@ -872,7 +878,7 @@ public class InternalTableImpl implements InternalTable { .enlistmentConsistencyToken(enlistmentConsistencyToken) .requestType(RW_GET) .timestampLong(clock.nowLong()) - .full(tx == null) + .full(false) .coordinatorId(txo.coordinatorId()) .build(), (res, req) -> false @@ -1012,6 +1018,10 @@ public class InternalTableImpl implements InternalTable { boolean first = true; for (BinaryRow row : rows) { + if (row == null) { + continue; + } + if (first) { schemaVersion = row.schemaVersion(); first = false; @@ -1056,7 +1066,7 @@ public class InternalTableImpl implements InternalTable { /** {@inheritDoc} */ @Override - public CompletableFuture<Void> upsert(BinaryRowEx row, InternalTransaction tx) { + public CompletableFuture<Void> upsert(BinaryRowEx row, @Nullable InternalTransaction tx) { return enlistInTx( row, tx, @@ -1078,7 +1088,7 @@ public class InternalTableImpl implements InternalTable { /** {@inheritDoc} */ @Override - public CompletableFuture<Void> upsertAll(Collection<BinaryRowEx> rows, InternalTransaction tx) { + public CompletableFuture<Void> upsertAll(Collection<BinaryRowEx> rows, @Nullable InternalTransaction tx) { return enlistInTx( rows, tx, @@ -1091,9 +1101,29 @@ public class InternalTableImpl implements InternalTable { /** {@inheritDoc} */ @Override public CompletableFuture<Void> updateAll(Collection<BinaryRowEx> rows, @Nullable BitSet deleted, int partition) { + return updateAllWithRetry(rows, deleted, partition, null); + } + + /** + * Update all with retry. + * + * @param rows Rows. + * @param deleted Deleted. + * @param partition The partition. + * @param txStartTs Start timestamp. + * @return The future. + */ + private CompletableFuture<Void> updateAllWithRetry( + Collection<BinaryRowEx> rows, + @Nullable BitSet deleted, + int partition, + @Nullable Long txStartTs + ) { InternalTransaction tx = txManager.begin(observableTimestampTracker); TablePartitionId partGroupId = new TablePartitionId(tableId, partition); + assert rows.stream().allMatch(row -> partitionId(row) == partition) : "Invalid batch for partition " + partition; + CompletableFuture<Void> fut = enlistAndInvoke( tx, partition, @@ -1102,7 +1132,20 @@ public class InternalTableImpl implements InternalTable { null ); - return postEnlist(fut, false, tx, true); // Will be committed in one RTT. + // Will be finished in one RTT. + return postEnlist(fut, false, tx, true).handle((r, e) -> { + if (e != null) { + long ts = (txStartTs == null) ? tx.startTimestamp().getPhysical() : txStartTs; + + if (exceptionAllowsImplicitTxRetry(e) && coarseCurrentTimeMillis() - ts < implicitTransactionTimeout) { + return updateAllWithRetry(rows, deleted, partition, ts); + } + + sneakyThrow(e); + } + + return completedFuture(r); + }).thenCompose(identity()); } /** {@inheritDoc} */ @@ -1702,7 +1745,7 @@ public class InternalTableImpl implements InternalTable { } return fut; - }).thenCompose(Function.identity()); + }).thenCompose(identity()); } /** @@ -2159,34 +2202,6 @@ public class InternalTableImpl implements InternalTable { }); } - /** - * Casts any exception type to a client exception, wherein {@link ReplicationException} and {@link LockException} are wrapped to - * {@link TransactionException}, but another exceptions are wrapped to a common exception. The method does not wrap an exception if the - * exception already inherits type of {@link RuntimeException}. - * - * @param e An instance exception to cast to client side one. - * @return {@link IgniteException} An instance of client side exception. - */ - private RuntimeException wrapReplicationException(Throwable e) { - if (e instanceof CompletionException) { - e = e.getCause(); - } - - RuntimeException e0; - - if (e instanceof ReplicationException || e instanceof ConnectException || e instanceof TimeoutException) { - e0 = withCause(TransactionException::new, TX_REPLICA_UNAVAILABLE_ERR, e); - } else if (e instanceof LockException) { - e0 = withCause(TransactionException::new, ACQUIRE_LOCK_ERR, e); - } else if (!(e instanceof RuntimeException)) { - e0 = withCause(IgniteException::new, INTERNAL_ERR, e); - } else { - e0 = (RuntimeException) e; - } - - return e0; - } - @Override public @Nullable PendingComparableValuesTracker<HybridTimestamp, Void> getPartitionSafeTimeTracker(int partitionId) { return safeTimeTrackerByPartitionId.get(partitionId); @@ -2273,13 +2288,7 @@ public class InternalTableImpl implements InternalTable { * @param e Exception to check. * @return True if retrying is possible, false otherwise. */ - private static boolean exceptionAllowsTxRetry(Throwable e) { - Throwable ex = unwrapCause(e); - - while (ex instanceof TransactionException && ex.getCause() != null) { - ex = ex.getCause(); - } - - return ex instanceof LockException || ex instanceof PrimaryReplicaMissException; + private static boolean exceptionAllowsImplicitTxRetry(Throwable e) { + return matchAny(unwrapCause(e), ACQUIRE_LOCK_ERR, REPLICA_MISS_ERR); } } diff --git a/modules/table/src/testFixtures/java/org/apache/ignite/distributed/ItTxTestCluster.java b/modules/table/src/testFixtures/java/org/apache/ignite/distributed/ItTxTestCluster.java index a7b3fe44ab..530dcbca3f 100644 --- a/modules/table/src/testFixtures/java/org/apache/ignite/distributed/ItTxTestCluster.java +++ b/modules/table/src/testFixtures/java/org/apache/ignite/distributed/ItTxTestCluster.java @@ -247,7 +247,6 @@ public class ItTxTestCluster { protected String localNodeName; private final ClusterNodeResolver nodeResolver = new ClusterNodeResolver() { - @Override public @Nullable ClusterNode getById(String id) { for (ClusterService service : cluster) { @@ -425,7 +424,8 @@ public class ItTxTestCluster { clusterService.messagingService(), clock, partitionOperationsExecutor, - replicationConfiguration + replicationConfiguration, + executor )); replicaServices.put(node.name(), replicaSvc); @@ -981,7 +981,8 @@ public class ItTxTestCluster { client.messagingService(), clientClock, partitionOperationsExecutor, - replicationConfiguration + replicationConfiguration, + executor )); LOG.info("The client has been started"); diff --git a/modules/transactions/src/main/java/org/apache/ignite/internal/tx/TransactionExceptionMapperProvider.java b/modules/transactions/src/main/java/org/apache/ignite/internal/tx/TransactionExceptionMapperProvider.java new file mode 100644 index 0000000000..7c0dc5c40c --- /dev/null +++ b/modules/transactions/src/main/java/org/apache/ignite/internal/tx/TransactionExceptionMapperProvider.java @@ -0,0 +1,46 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ignite.internal.tx; + +import static org.apache.ignite.internal.lang.IgniteExceptionMapper.unchecked; + +import com.google.auto.service.AutoService; +import java.util.ArrayList; +import java.util.Collection; +import java.util.List; +import org.apache.ignite.internal.lang.IgniteExceptionMapper; +import org.apache.ignite.internal.lang.IgniteExceptionMappersProvider; +import org.apache.ignite.internal.replicator.exception.ReplicationException; +import org.apache.ignite.tx.TransactionException; + +/** + * Transaction module exception mapper. + */ +@AutoService(IgniteExceptionMappersProvider.class) +public class TransactionExceptionMapperProvider implements IgniteExceptionMappersProvider { + @Override + public Collection<IgniteExceptionMapper<?, ?>> mappers() { + List<IgniteExceptionMapper<?, ?>> mappers = new ArrayList<>(); + + mappers.add(unchecked(LockException.class, err -> new TransactionException(err.traceId(), err.code(), err.getMessage(), err))); + mappers.add(unchecked(ReplicationException.class, + err -> new TransactionException(err.traceId(), err.code(), err.getMessage(), err))); + + return mappers; + } +}