rpuch commented on code in PR #1960:
URL: https://github.com/apache/ignite-3/pull/1960#discussion_r1172419950


##########
modules/table/src/main/java/org/apache/ignite/internal/table/distributed/storage/InternalTableImpl.java:
##########
@@ -101,8 +103,8 @@ public class InternalTableImpl implements InternalTable {
     /** Number of attempts. */
     private static final int ATTEMPTS_TO_ENLIST_PARTITION = 5;
 
-    /** Partition map. */
-    protected volatile Int2ObjectMap<RaftGroupService> partitionMap;
+    /** Map update guard by field {@link #updatePartitionMapsMux}. */

Review Comment:
   ```suggestion
       /** Map update guarded by field {@link #updatePartitionMapsMux}. */
   ```



##########
modules/core/src/main/java/org/apache/ignite/internal/util/TrackerClosedException.java:
##########
@@ -0,0 +1,25 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.util;
+
+/**
+ * Exception that will be thrown when the {@link 
PendingComparableValuesTracker} is closed.
+ */
+public class TrackerClosedException extends RuntimeException {

Review Comment:
   Shouldn't it extend `IgniteException`?



##########
modules/table/src/main/java/org/apache/ignite/internal/table/distributed/TableManager.java:
##########
@@ -725,8 +725,11 @@ private CompletableFuture<?> 
updateAssignmentInternal(ConfigurationNotificationE
 
                 placementDriver.updateAssignment(replicaGrpId, 
newConfiguration.peers().stream().map(Peer::consistentId).collect(toList()));
 
-                PendingComparableValuesTracker<HybridTimestamp> safeTime = new 
PendingComparableValuesTracker<>(new HybridTimestamp(1, 0));
-                PendingComparableValuesTracker<Long> storageIndexTracker = new 
PendingComparableValuesTracker<>(0L);
+                var safeTimeTracker = new PendingComparableValuesTracker<>(new 
HybridTimestamp(1, 0));
+                var storageIndexTracker = new 
PendingComparableValuesTracker<>(0L);
+
+                ((InternalTableImpl) 
internalTbl).updatePartitionSafeTimeTracker(partId, safeTimeTracker);

Review Comment:
   Why is this cast necessary? Isn't it possible to add the methods to the 
interface?



##########
modules/table/src/main/java/org/apache/ignite/internal/table/distributed/TableManager.java:
##########
@@ -1054,6 +1061,16 @@ private void cleanUpTablesResources(Map<UUID, TableImpl> 
tables) {
 
                 CompletableFuture<Void> removeFromGcFuture = 
mvGc.removeStorage(replicationGroupId);
 
+                stopping.add(() -> {
+                    try {
+                        
closeTracker(internalTable.getPartitionSafeTimeTracker(partitionId));
+
+                        
closeTracker(internalTable.getPartitionStorageIndexTracker(partitionId));

Review Comment:
   Let's extract these lines to a method, they are duplicated a few times



##########
modules/table/src/main/java/org/apache/ignite/internal/table/distributed/storage/InternalTableImpl.java:
##########
@@ -1470,4 +1478,62 @@ private RuntimeException 
wrapReplicationException(Throwable e) {
 
         return e0;
     }
+
+    @Override
+    public @Nullable PendingComparableValuesTracker<HybridTimestamp> 
getPartitionSafeTimeTracker(int partitionId) {
+        return safeTimeTrackerByPartitionId.get(partitionId);
+    }
+
+    @Override
+    public @Nullable PendingComparableValuesTracker<Long> 
getPartitionStorageIndexTracker(int partitionId) {
+        return storageIndexTrackerByPartitionId.get(partitionId);
+    }
+
+    /**
+     * Updates the partition safe time tracker, if there was a previous one, 
closes it.
+     *
+     * @param partitionId Partition ID.
+     * @param newSafeTimeTracker New partition safe time tracker.
+     */
+    public void updatePartitionSafeTimeTracker(int partitionId, 
PendingComparableValuesTracker<HybridTimestamp> newSafeTimeTracker) {
+        PendingComparableValuesTracker<HybridTimestamp> 
previousSafeTimeTracker;
+
+        synchronized (updatePartitionMapsMux) {
+            Int2ObjectMap<PendingComparableValuesTracker<HybridTimestamp>> 
newSafeTimeTrackerMap = new Int2ObjectOpenHashMap<>(partitions);
+
+            newSafeTimeTrackerMap.putAll(safeTimeTrackerByPartitionId);
+
+            previousSafeTimeTracker = newSafeTimeTrackerMap.put(partitionId, 
newSafeTimeTracker);
+
+            safeTimeTrackerByPartitionId = newSafeTimeTrackerMap;
+        }
+
+        if (previousSafeTimeTracker != null) {
+            previousSafeTimeTracker.close();

Review Comment:
   Do we have a test that makes sure that replaced trackers are closed?



##########
modules/table/src/main/java/org/apache/ignite/internal/table/distributed/raft/PartitionListener.java:
##########
@@ -465,4 +466,15 @@ void handleBuildIndexCommand(BuildIndexCommand cmd, long 
commandIndex, long comm
             );
         }
     }
+
+    private static <T extends Comparable<T>> void 
updateTrackerWithIgnoreTrackerClosedException(

Review Comment:
   I suggest renaming it to `updateTrackerIgnoringTrackerClosedException`



##########
modules/table/src/main/java/org/apache/ignite/internal/table/distributed/raft/PartitionListener.java:
##########
@@ -195,10 +196,10 @@ public void 
onWrite(Iterator<CommandClosure<WriteCommand>> iterator) {
 
                 assert safeTimePropagatingCommand.safeTime() != null;
 
-                
safeTime.update(safeTimePropagatingCommand.safeTime().asHybridTimestamp());
+                updateTrackerWithIgnoreTrackerClosedException(safeTime, 
safeTimePropagatingCommand.safeTime().asHybridTimestamp());
             }
 
-            storageIndexTracker.update(commandIndex);
+            updateTrackerWithIgnoreTrackerClosedException(storageIndexTracker, 
commandIndex);

Review Comment:
   `Replica#waitForActualState()` waits for next index. It might happen that 
the `PartitionListener` actually executes the command and updates the index in 
the storage, but then fails to update the tracker (and notify the subscribers, 
including `Replica#waitForActualState()`). Would it be handled correctly in 
`Replica#waitForActualState()`? Maybe we need to call someone from the TX team.



##########
modules/table/src/main/java/org/apache/ignite/internal/table/distributed/TableManager.java:
##########
@@ -725,8 +725,11 @@ private CompletableFuture<?> 
updateAssignmentInternal(ConfigurationNotificationE
 
                 placementDriver.updateAssignment(replicaGrpId, 
newConfiguration.peers().stream().map(Peer::consistentId).collect(toList()));
 
-                PendingComparableValuesTracker<HybridTimestamp> safeTime = new 
PendingComparableValuesTracker<>(new HybridTimestamp(1, 0));
-                PendingComparableValuesTracker<Long> storageIndexTracker = new 
PendingComparableValuesTracker<>(0L);
+                var safeTimeTracker = new PendingComparableValuesTracker<>(new 
HybridTimestamp(1, 0));
+                var storageIndexTracker = new 
PendingComparableValuesTracker<>(0L);
+
+                ((InternalTableImpl) 
internalTbl).updatePartitionSafeTimeTracker(partId, safeTimeTracker);
+                ((InternalTableImpl) 
internalTbl).updatePartitionStorageIndexTracker(partId, storageIndexTracker);

Review Comment:
   These 2 methods are always called together. Does it make sense to merge them 
in one?



##########
modules/table/src/main/java/org/apache/ignite/internal/table/distributed/storage/InternalTableImpl.java:
##########
@@ -128,15 +130,21 @@ public class InternalTableImpl implements InternalTable {
     /** Replica service. */
     private final ReplicaService replicaSvc;
 
-    /** Mutex for the partition map update. */
-    private final Object updatePartMapMux = new Object();
+    /** Mutex for the partition maps update. */
+    private final Object updatePartitionMapsMux = new Object();
 
     /** Table messages factory. */
     private final TableMessagesFactory tableMessagesFactory;
 
     /** A hybrid logical clock. */
     private final HybridClock clock;
 
+    /** Map update guard by field {@link #updatePartitionMapsMux}. */
+    private volatile 
Int2ObjectMap<PendingComparableValuesTracker<HybridTimestamp>> 
safeTimeTrackerByPartitionId = emptyMap();
+
+    /** Map update guard by field {@link #updatePartitionMapsMux}. */
+    private volatile Int2ObjectMap<PendingComparableValuesTracker<Long>> 
storageIndexTrackerByPartitionId = emptyMap();

Review Comment:
   ```suggestion
       /** Map update guarded by field {@link #updatePartitionMapsMux}. */
       private volatile 
Int2ObjectMap<PendingComparableValuesTracker<HybridTimestamp>> 
safeTimeTrackerByPartitionId = emptyMap();
   
       /** Map update guarded by field {@link #updatePartitionMapsMux}. */
       private volatile Int2ObjectMap<PendingComparableValuesTracker<Long>> 
storageIndexTrackerByPartitionId = emptyMap();
   ```



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to