sashapolo commented on code in PR #5310:
URL: https://github.com/apache/ignite-3/pull/5310#discussion_r1974268369
##########
modules/partition-replicator/src/main/java/org/apache/ignite/internal/partition/replicator/PartitionReplicaLifecycleManager.java:
##########
@@ -600,7 +620,7 @@ private CompletableFuture<?>
createZonePartitionReplicationNode(
.thenCompose(v ->
executeUnderZoneWriteLock(zonePartitionId.zoneId(), () -> {
replicationGroupIds.add(zonePartitionId);
- return falseCompletedFuture();
+ return trueCompletedFuture();
Review Comment:
This is crazy, why didn't we have any failing tests before?
##########
modules/partition-replicator/src/main/java/org/apache/ignite/internal/partition/replicator/PartitionReplicaLifecycleManager.java:
##########
@@ -555,15 +561,28 @@ private CompletableFuture<?>
createZonePartitionReplicationNode(
this::calculateZoneAssignments,
rebalanceRetryDelayConfiguration
);
-
Supplier<CompletableFuture<Boolean>> startReplicaSupplier = () -> {
+ var storageIndexTracker = new PendingComparableValuesTracker<Long,
Void>(0L);
Review Comment:
Should the `storageIndexTracker` be part of `ZonePartitionResources`?
##########
modules/partition-replicator/src/main/java/org/apache/ignite/internal/partition/replicator/raft/ZonePartitionRaftListener.java:
##########
@@ -237,6 +237,12 @@ private IgniteBiTuple<Serializable, Boolean>
processCrossTableProcessorsCommand(
) {
IgniteBiTuple<Serializable, Boolean> result = new
IgniteBiTuple<>(null, false);
+ // TODO https://issues.apache.org/jira/browse/IGNITE-24517 Remove. In
case of zero tables we still should
Review Comment:
You can remove this code, this will be fixed automatically by
https://github.com/apache/ignite-3/pull/5237
##########
modules/partition-replicator/src/test/java/org/apache/ignite/internal/partition/replicator/ZoneResourcesManagerTest.java:
##########
@@ -150,7 +158,15 @@ private static void
assertThatStorageIsStopped(ZonePartitionResources resources)
);
}
- private ZonePartitionResources allocatePartitionResources(ZonePartitionId
zonePartitionId, int partitionCount) {
- return bypassingThreadAssertions(() ->
manager.allocateZonePartitionResources(zonePartitionId, partitionCount));
+ private ZonePartitionResources allocatePartitionResources(
+ ZonePartitionId zonePartitionId,
+ int partitionCount,
+ PendingComparableValuesTracker<Long, Void> storageIndexTracker
Review Comment:
You always pass the same value you have already stored in a field, so this
parameter is redundant
##########
modules/partition-replicator/src/main/java/org/apache/ignite/internal/partition/replicator/PartitionReplicaLifecycleManager.java:
##########
@@ -555,15 +561,28 @@ private CompletableFuture<?>
createZonePartitionReplicationNode(
this::calculateZoneAssignments,
rebalanceRetryDelayConfiguration
);
-
Supplier<CompletableFuture<Boolean>> startReplicaSupplier = () -> {
+ var storageIndexTracker = new PendingComparableValuesTracker<Long,
Void>(0L);
var eventParams = new
LocalPartitionReplicaEventParameters(zonePartitionId, revision);
- ZonePartitionResources zoneResources =
zoneResourcesManager.allocateZonePartitionResources(zonePartitionId,
partitionCount);
+ ZonePartitionResources zoneResources =
zoneResourcesManager.allocateZonePartitionResources(
+ zonePartitionId,
+ partitionCount,
+ storageIndexTracker
+ );
return
fireEvent(LocalPartitionReplicaEvent.BEFORE_REPLICA_STARTED, eventParams)
.thenCompose(v -> {
try {
+ // TODO Comment for reviewer. I assume that we
should not aggregate (min/max) tableStorages.lastAppliedIndex().
+ //
https://issues.apache.org/jira/browse/IGNITE-24517 will allow to init
storageIndexTracker with the value
+ // from
txStatePartitionStorage().lastAppliedIndex(), is that correct?
+
storageIndexTracker.update(zoneResources.txStatePartitionStorage().lastAppliedIndex(),
null);
Review Comment:
This is correct, but I think that `storageIndexTracker` should not be
updated here, and should be updated inside `onSnapshotLoad` of the
corresponding Raft listener. Because otherwise we may have inconsistencies on
recovery, depending on how this tracker is used.
##########
modules/partition-replicator/src/test/java/org/apache/ignite/internal/partition/replicator/PartitionReplicaLifecycleManagerTest.java:
##########
@@ -146,13 +151,16 @@ void setUp(
when(distributionZoneManager.dataNodes(anyLong(), anyInt(),
anyInt())).thenReturn(completedFuture(Set.of(nodeName)));
- when(zoneResourcesManager.allocateZonePartitionResources(any(),
anyInt()))
+ when(zoneResourcesManager.allocateZonePartitionResources(any(),
anyInt(), any()))
.thenReturn(new ZonePartitionResources(
txStatePartitionStorage,
raftGroupListener,
partitionSnapshotStorageFactory
));
+ when(raftManager.startRaftGroupNode(any(), any(), any(), any(),
(RaftGroupOptions) any(), any()))
Review Comment:
why do you need the cast to `RaftGroupOptions`?
##########
modules/partition-replicator/src/main/java/org/apache/ignite/internal/partition/replicator/PartitionReplicaLifecycleManager.java:
##########
@@ -555,15 +561,28 @@ private CompletableFuture<?>
createZonePartitionReplicationNode(
this::calculateZoneAssignments,
rebalanceRetryDelayConfiguration
);
-
Supplier<CompletableFuture<Boolean>> startReplicaSupplier = () -> {
+ var storageIndexTracker = new PendingComparableValuesTracker<Long,
Void>(0L);
var eventParams = new
LocalPartitionReplicaEventParameters(zonePartitionId, revision);
- ZonePartitionResources zoneResources =
zoneResourcesManager.allocateZonePartitionResources(zonePartitionId,
partitionCount);
+ ZonePartitionResources zoneResources =
zoneResourcesManager.allocateZonePartitionResources(
+ zonePartitionId,
+ partitionCount,
+ storageIndexTracker
+ );
return
fireEvent(LocalPartitionReplicaEvent.BEFORE_REPLICA_STARTED, eventParams)
.thenCompose(v -> {
try {
+ // TODO Comment for reviewer. I assume that we
should not aggregate (min/max) tableStorages.lastAppliedIndex().
+ //
https://issues.apache.org/jira/browse/IGNITE-24517 will allow to init
storageIndexTracker with the value
+ // from
txStatePartitionStorage().lastAppliedIndex(), is that correct?
+
storageIndexTracker.update(zoneResources.txStatePartitionStorage().lastAppliedIndex(),
null);
+
+ // TODO
https://issues.apache.org/jira/browse/IGNITE-24654 Properly close
storageIndexTracker.
+ // internalTbl.updatePartitionTrackers is used in
order to add storageIndexTracker to some context for further
+ // storage closing.
+ // internalTbl.updatePartitionTrackers(partId,
safeTimeTracker, storageIndexTracker);
Review Comment:
This is confusing, why is this line commented out?
##########
modules/partition-replicator/src/main/java/org/apache/ignite/internal/partition/replicator/raft/ZonePartitionRaftListener.java:
##########
@@ -275,6 +281,14 @@ public void
onConfigurationCommitted(RaftGroupConfiguration config, long lastApp
tableProcessors.values()
.forEach(listener ->
listener.onConfigurationCommitted(config, lastAppliedIndex, lastAppliedTerm));
+
+ partitionSnapshots().acquireReadLock();
Review Comment:
Why do we need to acquire the snapshot lock here? As far as I understand, we
need this lock when processing commands that non-atomically update the storage
in order to prevent concurrent snapshots from observing an intermediate storage
state. `updateTrackerIgnoringTrackerClosedException` does not access the
storage at all. However, we can add this lock around the
`tableProcessors.values()` call above as an optimization, because this lock is
taken there for every table processor.
##########
modules/partition-replicator/src/main/java/org/apache/ignite/internal/partition/replicator/PartitionReplicaLifecycleManager.java:
##########
@@ -1443,6 +1452,35 @@ private <T> CompletableFuture<T>
executeUnderZoneWriteLock(int zoneId, Supplier<
}
}
+ private CompletableFuture<Boolean>
onPrimaryReplicaExpired(PrimaryReplicaEventParameters parameters) {
+ if
(topologyService.localMember().id().equals(parameters.leaseholderId())) {
+ ZonePartitionId groupId = (ZonePartitionId) parameters.groupId();
+
+ replicaMgr.weakStopReplica(
Review Comment:
Please leave a comment why we don't wait for this future (at least that this
is intentional).
##########
modules/partition-replicator/src/main/java/org/apache/ignite/internal/partition/replicator/raft/ZonePartitionRaftListener.java:
##########
@@ -307,6 +321,10 @@ public void addTableProcessor(TablePartitionId
tablePartitionId, RaftTableProces
currentCommitedConfiguration.lastAppliedIndex,
currentCommitedConfiguration.lastAppliedTerm
);
+
+ // TODO https://issues.apache.org/jira/browse/IGNITE-24517
propagate lease information from txnStateStorage to newly added
Review Comment:
Will be fixed by https://github.com/apache/ignite-3/pull/5237 as well
##########
modules/partition-replicator/src/test/java/org/apache/ignite/internal/partition/replicator/ZoneResourcesManagerTest.java:
##########
@@ -59,6 +60,9 @@ class ZoneResourcesManagerTest extends IgniteAbstractTest {
private ZoneResourcesManager manager;
+ // TODO https://issues.apache.org/jira/browse/IGNITE-24654 Ensure that
tracker is closed.
+ private PendingComparableValuesTracker<Long, Void> storageIndexTracker;
Review Comment:
Why don't you close it in this test?
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]