This is an automated email from the ASF dual-hosted git repository.
sanpwc pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/ignite-3.git
The following commit(s) were added to refs/heads/main by this push:
new f8b0785561 IGNITE-20870 Explain why snapshots lock is not taken in
PartitionReplicaListener (#3385)
f8b0785561 is described below
commit f8b078556189e5db2547aaf2484e7ab24ebd5975
Author: Roman Puchkovskiy <[email protected]>
AuthorDate: Fri Mar 15 13:45:42 2024 +0400
IGNITE-20870 Explain why snapshots lock is not taken in
PartitionReplicaListener (#3385)
---
.../table/distributed/raft/PartitionListener.java | 14 +++++-----
.../replicator/PartitionReplicaListener.java | 31 ++++++++++++++++++++++
.../distributed/TestPartitionDataStorage.java | 9 ++-----
3 files changed, 41 insertions(+), 13 deletions(-)
diff --git
a/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/raft/PartitionListener.java
b/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/raft/PartitionListener.java
index 69bd2e2cd9..21a9565a85 100644
---
a/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/raft/PartitionListener.java
+++
b/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/raft/PartitionListener.java
@@ -184,6 +184,8 @@ public class PartitionListener implements
RaftGroupListener, BeforeApplyHandler
+ ", mvAppliedIndex=" + storage.lastAppliedIndex()
+ ", txStateAppliedIndex=" +
txStateStorage.lastAppliedIndex() + "]";
+ Serializable result = null;
+
// NB: Make sure that ANY command we accept here updates
lastAppliedIndex+term info in one of the underlying
// storages!
// Otherwise, a gap between lastAppliedIndex from the point of
view of JRaft and our storage might appear.
@@ -195,8 +197,6 @@ public class PartitionListener implements
RaftGroupListener, BeforeApplyHandler
storage.acquirePartitionSnapshotsReadLock();
- Serializable result = null;
-
try {
if (command instanceof UpdateCommand) {
handleUpdateCommand((UpdateCommand) command, commandIndex,
commandTerm);
@@ -213,12 +213,10 @@ public class PartitionListener implements
RaftGroupListener, BeforeApplyHandler
} else {
assert false : "Command was not found [cmd=" + command +
']';
}
-
- clo.result(result);
} catch (IgniteInternalException e) {
- clo.result(e);
+ result = e;
} catch (CompletionException e) {
- clo.result(e.getCause());
+ result = e.getCause();
} catch (Throwable t) {
LOG.error(
"Unknown error while processing command
[commandIndex={}, commandTerm={}, command={}]",
@@ -231,6 +229,10 @@ public class PartitionListener implements
RaftGroupListener, BeforeApplyHandler
storage.releasePartitionSnapshotsReadLock();
}
+ // Completing the closure out of the partition snapshots lock to
reduce possibility of deadlocks as it might
+ // trigger other actions taking same locks.
+ clo.result(result);
+
if (command instanceof SafeTimePropagatingCommand) {
SafeTimePropagatingCommand safeTimePropagatingCommand =
(SafeTimePropagatingCommand) command;
diff --git
a/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/replicator/PartitionReplicaListener.java
b/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/replicator/PartitionReplicaListener.java
index fd21ca9721..b16202cabf 100644
---
a/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/replicator/PartitionReplicaListener.java
+++
b/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/replicator/PartitionReplicaListener.java
@@ -196,6 +196,30 @@ import org.jetbrains.annotations.Nullable;
/** Partition replication listener. */
public class PartitionReplicaListener implements ReplicaListener {
+ /**
+ * NB: this listener makes writes to the underlying MV partition storage
without taking the partition snapshots read lock.
+ * This causes the RAFT snapshots transferred to a follower being slightly
inconsistent for a limited amount of time.
+ *
+ * <p>A RAFT snapshot of a partition consists of MV data, TX state data
and metadata (which includes RAFT applied index).
+ * Here, the 'slight' inconsistency is that MV data might be ahead of the
snapshot meta (namely, RAFT applied index) and TX state data.
+ *
+ * <p>This listener by its nature cannot advance RAFT applied index (as it
works out of the RAFT framework). This alone makes
+ * the partition 'slightly inconsistent' in the same way as defined above.
So, if we solve this inconsistency,
+ * we don't need to take the partition snapshots read lock as well.
+ *
+ * <p>The inconsistency does not cause any real problems because it is
further resolved.
+ * <ul>
+ * <li>If the follower with a 'slightly' inconsistent partition state
becomes a primary replica, this requires it to apply
+ * whole available RAFT log from the leader before actually becoming a
primary; this application will remove the inconsistency</li>
+ * <li>If a node with this inconsistency is going to become a primary,
and it's already the leader, then the above will not help.
+ * But write intent resolution procedure will close the gap.</li>
+ * <li>2 items above solve the inconsistency for RW transactions</li>
+ * <li>For RO reading from such a 'slightly inconsistent' partition,
write intent resolution closes the gap as well.</li>
+ * </ul>*
+ */
+ @SuppressWarnings("unused") // We use it as a placeholder of a
documentation which can be linked using # and @see.
+ private static final Object INTERNAL_DOC_PLACEHOLDER = null;
+
/** Logger. */
private static final IgniteLogger LOG =
Loggers.forClass(PartitionReplicaListener.class);
@@ -2542,6 +2566,7 @@ public class PartitionReplicaListener implements
ReplicaListener {
if (!cmd.full()) {
// TODO: https://issues.apache.org/jira/browse/IGNITE-20124
Temporary code below
synchronized (safeTime) {
+ // We don't need to take the partition snapshots read
lock, see #INTERNAL_DOC_PLACEHOLDER why.
storageUpdateHandler.handleUpdate(
cmd.txId(),
cmd.rowUuid(),
@@ -2580,6 +2605,7 @@ public class PartitionReplicaListener implements
ReplicaListener {
// Try to avoid double write if an entry is already
replicated.
synchronized (safeTime) {
if (cmd.safeTime().compareTo(safeTime.current()) > 0) {
+ // We don't need to take the partition snapshots
read lock, see #INTERNAL_DOC_PLACEHOLDER why.
storageUpdateHandler.handleUpdate(
cmd.txId(),
cmd.rowUuid(),
@@ -2667,6 +2693,7 @@ public class PartitionReplicaListener implements
ReplicaListener {
if (skipDelayedAck) {
// TODO:
https://issues.apache.org/jira/browse/IGNITE-20124 Temporary code below
synchronized (safeTime) {
+ // We don't need to take the partition snapshots read
lock, see #INTERNAL_DOC_PLACEHOLDER why.
storageUpdateHandler.handleUpdateAll(
cmd.txId(),
cmd.rowsToUpdate(),
@@ -2684,6 +2711,7 @@ public class PartitionReplicaListener implements
ReplicaListener {
} else {
// TODO:
https://issues.apache.org/jira/browse/IGNITE-20124 Temporary code below
synchronized (safeTime) {
+ // We don't need to take the partition snapshots read
lock, see #INTERNAL_DOC_PLACEHOLDER why.
storageUpdateHandler.handleUpdateAll(
cmd.txId(),
cmd.rowsToUpdate(),
@@ -2718,6 +2746,7 @@ public class PartitionReplicaListener implements
ReplicaListener {
// Try to avoid double write if an entry is
already replicated.
synchronized (safeTime) {
if
(cmd.safeTime().compareTo(safeTime.current()) > 0) {
+ // We don't need to take the partition
snapshots read lock, see #INTERNAL_DOC_PLACEHOLDER why.
storageUpdateHandler.handleUpdateAll(
cmd.txId(),
cmd.rowsToUpdate(),
@@ -3485,6 +3514,8 @@ public class PartitionReplicaListener implements
ReplicaListener {
CompletableFuture<?> future = rowCleanupMap.computeIfAbsent(rowId, k
-> {
// The cleanup for this row has already been triggered. For
example, we are resolving a write intent for an RW transaction
// and a concurrent RO transaction resolves the same row, hence
computeIfAbsent.
+
+ // We don't need to take the partition snapshots read lock, see
#INTERNAL_DOC_PLACEHOLDER why.
return txManager.executeWriteIntentSwitchAsync(() ->
inBusyLock(busyLock,
() -> storageUpdateHandler.switchWriteIntents(
txId,
diff --git
a/modules/table/src/testFixtures/java/org/apache/ignite/distributed/TestPartitionDataStorage.java
b/modules/table/src/testFixtures/java/org/apache/ignite/distributed/TestPartitionDataStorage.java
index 2b8a78aa63..79fd8cea72 100644
---
a/modules/table/src/testFixtures/java/org/apache/ignite/distributed/TestPartitionDataStorage.java
+++
b/modules/table/src/testFixtures/java/org/apache/ignite/distributed/TestPartitionDataStorage.java
@@ -19,8 +19,6 @@ package org.apache.ignite.distributed;
import java.util.UUID;
import java.util.concurrent.CompletableFuture;
-import java.util.concurrent.locks.Lock;
-import java.util.concurrent.locks.ReentrantLock;
import org.apache.ignite.internal.hlc.HybridTimestamp;
import org.apache.ignite.internal.schema.BinaryRow;
import org.apache.ignite.internal.storage.MvPartitionStorage;
@@ -47,8 +45,6 @@ public class TestPartitionDataStorage implements
PartitionDataStorage {
private final MvPartitionStorage partitionStorage;
- private final Lock partitionSnapshotsLock = new ReentrantLock();
-
private final RaftGroupConfigurationConverter configurationConverter = new
RaftGroupConfigurationConverter();
/** Constructor. */
@@ -77,15 +73,14 @@ public class TestPartitionDataStorage implements
PartitionDataStorage {
return partitionStorage.runConsistently(closure);
}
- @SuppressWarnings("LockAcquiredButNotSafelyReleased")
@Override
public void acquirePartitionSnapshotsReadLock() {
- partitionSnapshotsLock.lock();
+ // There is no 'write' side, so we don't need to take any lock.
}
@Override
public void releasePartitionSnapshotsReadLock() {
- partitionSnapshotsLock.unlock();
+ // There is no 'write' side, so we don't need to releasetestbala any
lock.
}
@Override