This is an automated email from the ASF dual-hosted git repository. sanpwc pushed a commit to branch main in repository https://gitbox.apache.org/repos/asf/ignite-3.git
The following commit(s) were added to refs/heads/main by this push: new f8b0785561 IGNITE-20870 Explain why snapshots lock is not taken in PartitionReplicaListener (#3385) f8b0785561 is described below commit f8b078556189e5db2547aaf2484e7ab24ebd5975 Author: Roman Puchkovskiy <roman.puchkovs...@gmail.com> AuthorDate: Fri Mar 15 13:45:42 2024 +0400 IGNITE-20870 Explain why snapshots lock is not taken in PartitionReplicaListener (#3385) --- .../table/distributed/raft/PartitionListener.java | 14 +++++----- .../replicator/PartitionReplicaListener.java | 31 ++++++++++++++++++++++ .../distributed/TestPartitionDataStorage.java | 9 ++----- 3 files changed, 41 insertions(+), 13 deletions(-) diff --git a/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/raft/PartitionListener.java b/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/raft/PartitionListener.java index 69bd2e2cd9..21a9565a85 100644 --- a/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/raft/PartitionListener.java +++ b/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/raft/PartitionListener.java @@ -184,6 +184,8 @@ public class PartitionListener implements RaftGroupListener, BeforeApplyHandler + ", mvAppliedIndex=" + storage.lastAppliedIndex() + ", txStateAppliedIndex=" + txStateStorage.lastAppliedIndex() + "]"; + Serializable result = null; + // NB: Make sure that ANY command we accept here updates lastAppliedIndex+term info in one of the underlying // storages! // Otherwise, a gap between lastAppliedIndex from the point of view of JRaft and our storage might appear. @@ -195,8 +197,6 @@ public class PartitionListener implements RaftGroupListener, BeforeApplyHandler storage.acquirePartitionSnapshotsReadLock(); - Serializable result = null; - try { if (command instanceof UpdateCommand) { handleUpdateCommand((UpdateCommand) command, commandIndex, commandTerm); @@ -213,12 +213,10 @@ public class PartitionListener implements RaftGroupListener, BeforeApplyHandler } else { assert false : "Command was not found [cmd=" + command + ']'; } - - clo.result(result); } catch (IgniteInternalException e) { - clo.result(e); + result = e; } catch (CompletionException e) { - clo.result(e.getCause()); + result = e.getCause(); } catch (Throwable t) { LOG.error( "Unknown error while processing command [commandIndex={}, commandTerm={}, command={}]", @@ -231,6 +229,10 @@ public class PartitionListener implements RaftGroupListener, BeforeApplyHandler storage.releasePartitionSnapshotsReadLock(); } + // Completing the closure out of the partition snapshots lock to reduce possibility of deadlocks as it might + // trigger other actions taking same locks. + clo.result(result); + if (command instanceof SafeTimePropagatingCommand) { SafeTimePropagatingCommand safeTimePropagatingCommand = (SafeTimePropagatingCommand) command; diff --git a/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/replicator/PartitionReplicaListener.java b/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/replicator/PartitionReplicaListener.java index fd21ca9721..b16202cabf 100644 --- a/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/replicator/PartitionReplicaListener.java +++ b/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/replicator/PartitionReplicaListener.java @@ -196,6 +196,30 @@ import org.jetbrains.annotations.Nullable; /** Partition replication listener. */ public class PartitionReplicaListener implements ReplicaListener { + /** + * NB: this listener makes writes to the underlying MV partition storage without taking the partition snapshots read lock. + * This causes the RAFT snapshots transferred to a follower being slightly inconsistent for a limited amount of time. + * + * <p>A RAFT snapshot of a partition consists of MV data, TX state data and metadata (which includes RAFT applied index). + * Here, the 'slight' inconsistency is that MV data might be ahead of the snapshot meta (namely, RAFT applied index) and TX state data. + * + * <p>This listener by its nature cannot advance RAFT applied index (as it works out of the RAFT framework). This alone makes + * the partition 'slightly inconsistent' in the same way as defined above. So, if we solve this inconsistency, + * we don't need to take the partition snapshots read lock as well. + * + * <p>The inconsistency does not cause any real problems because it is further resolved. + * <ul> + * <li>If the follower with a 'slightly' inconsistent partition state becomes a primary replica, this requires it to apply + * whole available RAFT log from the leader before actually becoming a primary; this application will remove the inconsistency</li> + * <li>If a node with this inconsistency is going to become a primary, and it's already the leader, then the above will not help. + * But write intent resolution procedure will close the gap.</li> + * <li>2 items above solve the inconsistency for RW transactions</li> + * <li>For RO reading from such a 'slightly inconsistent' partition, write intent resolution closes the gap as well.</li> + * </ul>* + */ + @SuppressWarnings("unused") // We use it as a placeholder of a documentation which can be linked using # and @see. + private static final Object INTERNAL_DOC_PLACEHOLDER = null; + /** Logger. */ private static final IgniteLogger LOG = Loggers.forClass(PartitionReplicaListener.class); @@ -2542,6 +2566,7 @@ public class PartitionReplicaListener implements ReplicaListener { if (!cmd.full()) { // TODO: https://issues.apache.org/jira/browse/IGNITE-20124 Temporary code below synchronized (safeTime) { + // We don't need to take the partition snapshots read lock, see #INTERNAL_DOC_PLACEHOLDER why. storageUpdateHandler.handleUpdate( cmd.txId(), cmd.rowUuid(), @@ -2580,6 +2605,7 @@ public class PartitionReplicaListener implements ReplicaListener { // Try to avoid double write if an entry is already replicated. synchronized (safeTime) { if (cmd.safeTime().compareTo(safeTime.current()) > 0) { + // We don't need to take the partition snapshots read lock, see #INTERNAL_DOC_PLACEHOLDER why. storageUpdateHandler.handleUpdate( cmd.txId(), cmd.rowUuid(), @@ -2667,6 +2693,7 @@ public class PartitionReplicaListener implements ReplicaListener { if (skipDelayedAck) { // TODO: https://issues.apache.org/jira/browse/IGNITE-20124 Temporary code below synchronized (safeTime) { + // We don't need to take the partition snapshots read lock, see #INTERNAL_DOC_PLACEHOLDER why. storageUpdateHandler.handleUpdateAll( cmd.txId(), cmd.rowsToUpdate(), @@ -2684,6 +2711,7 @@ public class PartitionReplicaListener implements ReplicaListener { } else { // TODO: https://issues.apache.org/jira/browse/IGNITE-20124 Temporary code below synchronized (safeTime) { + // We don't need to take the partition snapshots read lock, see #INTERNAL_DOC_PLACEHOLDER why. storageUpdateHandler.handleUpdateAll( cmd.txId(), cmd.rowsToUpdate(), @@ -2718,6 +2746,7 @@ public class PartitionReplicaListener implements ReplicaListener { // Try to avoid double write if an entry is already replicated. synchronized (safeTime) { if (cmd.safeTime().compareTo(safeTime.current()) > 0) { + // We don't need to take the partition snapshots read lock, see #INTERNAL_DOC_PLACEHOLDER why. storageUpdateHandler.handleUpdateAll( cmd.txId(), cmd.rowsToUpdate(), @@ -3485,6 +3514,8 @@ public class PartitionReplicaListener implements ReplicaListener { CompletableFuture<?> future = rowCleanupMap.computeIfAbsent(rowId, k -> { // The cleanup for this row has already been triggered. For example, we are resolving a write intent for an RW transaction // and a concurrent RO transaction resolves the same row, hence computeIfAbsent. + + // We don't need to take the partition snapshots read lock, see #INTERNAL_DOC_PLACEHOLDER why. return txManager.executeWriteIntentSwitchAsync(() -> inBusyLock(busyLock, () -> storageUpdateHandler.switchWriteIntents( txId, diff --git a/modules/table/src/testFixtures/java/org/apache/ignite/distributed/TestPartitionDataStorage.java b/modules/table/src/testFixtures/java/org/apache/ignite/distributed/TestPartitionDataStorage.java index 2b8a78aa63..79fd8cea72 100644 --- a/modules/table/src/testFixtures/java/org/apache/ignite/distributed/TestPartitionDataStorage.java +++ b/modules/table/src/testFixtures/java/org/apache/ignite/distributed/TestPartitionDataStorage.java @@ -19,8 +19,6 @@ package org.apache.ignite.distributed; import java.util.UUID; import java.util.concurrent.CompletableFuture; -import java.util.concurrent.locks.Lock; -import java.util.concurrent.locks.ReentrantLock; import org.apache.ignite.internal.hlc.HybridTimestamp; import org.apache.ignite.internal.schema.BinaryRow; import org.apache.ignite.internal.storage.MvPartitionStorage; @@ -47,8 +45,6 @@ public class TestPartitionDataStorage implements PartitionDataStorage { private final MvPartitionStorage partitionStorage; - private final Lock partitionSnapshotsLock = new ReentrantLock(); - private final RaftGroupConfigurationConverter configurationConverter = new RaftGroupConfigurationConverter(); /** Constructor. */ @@ -77,15 +73,14 @@ public class TestPartitionDataStorage implements PartitionDataStorage { return partitionStorage.runConsistently(closure); } - @SuppressWarnings("LockAcquiredButNotSafelyReleased") @Override public void acquirePartitionSnapshotsReadLock() { - partitionSnapshotsLock.lock(); + // There is no 'write' side, so we don't need to take any lock. } @Override public void releasePartitionSnapshotsReadLock() { - partitionSnapshotsLock.unlock(); + // There is no 'write' side, so we don't need to releasetestbala any lock. } @Override