tkalkirill commented on code in PR #1280:
URL: https://github.com/apache/ignite-3/pull/1280#discussion_r1010469876


##########
modules/table/src/main/java/org/apache/ignite/internal/table/distributed/raft/snapshot/PartitionSnapshotStorageFactory.java:
##########
@@ -77,44 +73,34 @@ public PartitionSnapshotStorageFactory(
             TopologyService topologyService,
             OutgoingSnapshotsManager outgoingSnapshotsManager,
             PartitionAccess partition,
-            List<String> peers,
-            List<String> learners,
             Executor incomingSnapshotsExecutor
     ) {
         this.topologyService = topologyService;
         this.outgoingSnapshotsManager = outgoingSnapshotsManager;
         this.partition = partition;
-        this.peers = peers;
-        this.learners = learners;
         this.incomingSnapshotsExecutor = incomingSnapshotsExecutor;
 
         // We must choose the minimum applied index for local recovery so that 
we don't skip the raft commands for the storage with the
         // lowest applied index and thus no data loss occurs.
-        persistedRaftIndex = Math.min(
-                partition.mvPartitionStorage().persistedIndex(),
-                partition.txStatePartitionStorage().persistedIndex()
+        lastIncludedRaftIndex = Math.min(
+                partition.mvPartitionStorage().lastAppliedIndex(),
+                partition.txStatePartitionStorage().lastAppliedIndex()

Review Comment:
   It should be the last persistent (saved) index.



##########
modules/raft/src/main/java/org/apache/ignite/raft/jraft/JRaftServiceFactory.java:
##########
@@ -42,9 +42,10 @@ public interface JRaftServiceFactory {
      *
      * @param uri The snapshot storage uri from {@link 
NodeOptions#getSnapshotUri()}
      * @param raftOptions the raft options.
+     * @param logManager the log manager.

Review Comment:
   Which log manager?



##########
modules/table/src/main/java/org/apache/ignite/internal/table/distributed/raft/snapshot/outgoing/OutgoingSnapshot.java:
##########
@@ -133,23 +146,57 @@ public PartitionKey partitionKey() {
     }
 
     /**
-     * Freezes the scope of this snapshot.
+     * Freezes the scope of this snapshot. This includes taking snapshot 
metadata and opening TX data cursor.
      *
      * <p>Must be called under snapshot lock.
      */
     void freezeScope() {
         assert mvOperationsLock.isLocked() : "MV operations lock must be 
acquired!";
 
+        meta = takeSnapshotMeta();
+
         txDataCursor = partition.txStatePartitionStorage().scan();
     }
 
+    private SnapshotMeta takeSnapshotMeta() {
+        long lastAppliedIndex = Math.max(
+                partition.mvPartitionStorage().lastAppliedIndex(),
+                partition.txStatePartitionStorage().lastAppliedIndex()
+        );
+
+        return SnapshotMetaUtils.snapshotMetaAt(lastAppliedIndex, logManager);
+    }
+
     /**
-     * Reads a snapshot meta and returns a future with the response.
+     * Returns metadata corresponding to this snapshot.
      *
-     * @param metaRequest Meta request.
+     * @return This snapshot metadata.
      */
-    SnapshotMetaResponse handleSnapshotMetaRequest(SnapshotMetaRequest 
metaRequest) {
-        //TODO https://issues.apache.org/jira/browse/IGNITE-17935
+    public SnapshotMeta meta() {
+        assert meta != null : "No snapshot meta yet, probably the snapshot 
scope was not yet frozen";
+
+        return meta;
+    }
+
+    /**
+     * Reads the snapshot meta and returns a future with the response.
+     *
+     * @param request Meta request.
+     */
+    @Nullable
+    SnapshotMetaResponse handleSnapshotMetaRequest(SnapshotMetaRequest 
request) {
+        if (closed) {
+            return logAlreadyClosedAndReturnNull();
+        }
+
+        assert meta != null : "No snapshot meta yet, probably the snapshot 
scope was not yet frozen";
+
+        return MESSAGES_FACTORY.snapshotMetaResponse().meta(meta).build();
+    }
+
+    @Nullable
+    private <T> T logAlreadyClosedAndReturnNull() {

Review Comment:
   Maybe **Void** ?



##########
modules/table/src/main/java/org/apache/ignite/internal/table/distributed/raft/snapshot/outgoing/OutgoingSnapshot.java:
##########
@@ -71,18 +76,23 @@ public class OutgoingSnapshot {
 
     private final ReusableLockLockup mvOperationsLockup = new 
ReusableLockLockup(mvOperationsLock);
 
+    /**
+     * Snapshot metadata taken on snapshot scope freezing.
+     */
+    private SnapshotMeta meta;

Review Comment:
   I suggest renaming it.



##########
modules/table/src/main/java/org/apache/ignite/internal/table/distributed/raft/snapshot/outgoing/SnapshotAwarePartitionDataStorage.java:
##########
@@ -123,11 +127,21 @@ private void handleSnapshotInterference(RowId rowId) {
 
     @Override
     public void close() throws Exception {
-        // TODO: IGNITE-17935 - terminate all snapshots of this partition 
considering correct locking to do it consistently
+        cleanupSnapshots();
 
         partitionStorage.close();
     }
 
+    private void cleanupSnapshots() {
+        PartitionSnapshots partitionSnapshots = getPartitionSnapshots();
+
+        try (AutoLockup ignored = partitionSnapshots.acquireReadLock()) {

Review Comment:
   Let's change the name **ignored**.



##########
modules/table/src/main/java/org/apache/ignite/internal/table/distributed/raft/snapshot/outgoing/PartitionsSnapshots.java:
##########
@@ -30,4 +31,19 @@ public interface PartitionsSnapshots {
      * @return PartitionSnapshots instance.
      */
     PartitionSnapshots partitionSnapshots(PartitionKey partitionKey);
+
+    /**
+     * Removes the underlying collection for snapshots of this partition.
+     *
+     * @param partitionKey Partition key.
+     */
+    void removeSnapshots(PartitionKey partitionKey);
+

Review Comment:
   ```suggestion
   ```



##########
modules/raft/src/main/java/org/apache/ignite/internal/raft/storage/impl/IgniteJraftServiceFactory.java:
##########
@@ -38,7 +39,9 @@ public class IgniteJraftServiceFactory extends 
DefaultJRaftServiceFactory {
     private final LogStorageFactory logStorageFactory;
 
     /** Snapshot storage factory. */
-    private volatile SnapshotStorageFactory snapshotStorageFactory = 
LocalSnapshotStorage::new;
+    private volatile SnapshotStorageFactory snapshotStorageFactory = (path, 
raftOptions, logManager) -> {
+        return new LocalSnapshotStorage(path, raftOptions);
+    };

Review Comment:
   ```suggestion
       private volatile SnapshotStorageFactory snapshotStorageFactory = (path, 
raftOptions, logManager) -> new LocalSnapshotStorage(path, raftOptions);
   ```



##########
modules/raft/src/main/java/org/apache/ignite/internal/raft/storage/SnapshotStorageFactory.java:
##########
@@ -28,7 +29,8 @@ public interface SnapshotStorageFactory {
      *
      * @param uri Snapshot URI.
      * @param raftOptions Raft options.
+     * @param logManager Log manager.

Review Comment:
   Which log manager?



##########
modules/table/src/main/java/org/apache/ignite/internal/table/distributed/raft/snapshot/outgoing/SnapshotMetaUtils.java:
##########
@@ -0,0 +1,67 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.table.distributed.raft.snapshot.outgoing;
+
+import static java.util.stream.Collectors.toList;
+
+import java.util.Collection;
+import java.util.List;
+import org.apache.ignite.raft.jraft.RaftMessagesFactory;
+import org.apache.ignite.raft.jraft.conf.ConfigurationEntry;
+import org.apache.ignite.raft.jraft.entity.PeerId;
+import org.apache.ignite.raft.jraft.entity.RaftOutter.SnapshotMeta;
+import org.apache.ignite.raft.jraft.entity.SnapshotMetaBuilder;
+import org.apache.ignite.raft.jraft.storage.LogManager;
+
+/**
+ * Utils to build {@link SnapshotMeta} instances.
+ */
+public class SnapshotMetaUtils {
+    /**
+     * Builds a {@link SnapshotMeta} corresponding to RAFT state (term, 
configuration) at the given log index.
+     *
+     * @param logIndex RAFT log index.
+     * @param logManager LogManager from which to load term and configuration.
+     * @return SnapshotMeta corresponding to the given log index.
+     */
+    public static SnapshotMeta snapshotMetaAt(long logIndex, LogManager 
logManager) {
+        ConfigurationEntry configEntry = logManager.getConfiguration(logIndex);
+
+        SnapshotMetaBuilder metaBuilder = new 
RaftMessagesFactory().snapshotMeta()
+                .lastIncludedIndex(logIndex)
+                .lastIncludedTerm(logManager.getTerm(logIndex))
+                .peersList(peersToStrings(configEntry.getConf().listPeers()))
+                
.learnersList(peersToStrings(configEntry.getConf().listLearners()));
+
+        if (configEntry.getOldConf() != null) {
+            metaBuilder
+                    
.oldPeersList(peersToStrings(configEntry.getOldConf().listPeers()))
+                    
.oldLearnersList(peersToStrings(configEntry.getOldConf().listLearners()));
+        }
+
+        return metaBuilder.build();
+    }
+
+    private static Collection<String> peersToStrings(List<PeerId> peers) {
+        return peers.stream().map(PeerId::toString).collect(toList());
+    }
+
+    private SnapshotMetaUtils() {

Review Comment:
   It's not needed.



##########
modules/table/src/main/java/org/apache/ignite/internal/table/distributed/raft/snapshot/outgoing/OutgoingSnapshot.java:
##########
@@ -133,23 +146,57 @@ public PartitionKey partitionKey() {
     }
 
     /**
-     * Freezes the scope of this snapshot.
+     * Freezes the scope of this snapshot. This includes taking snapshot 
metadata and opening TX data cursor.
      *
      * <p>Must be called under snapshot lock.
      */
     void freezeScope() {
         assert mvOperationsLock.isLocked() : "MV operations lock must be 
acquired!";
 
+        meta = takeSnapshotMeta();
+
         txDataCursor = partition.txStatePartitionStorage().scan();
     }
 
+    private SnapshotMeta takeSnapshotMeta() {
+        long lastAppliedIndex = Math.max(
+                partition.mvPartitionStorage().lastAppliedIndex(),
+                partition.txStatePartitionStorage().lastAppliedIndex()
+        );
+
+        return SnapshotMetaUtils.snapshotMetaAt(lastAppliedIndex, logManager);
+    }
+
     /**
-     * Reads a snapshot meta and returns a future with the response.
+     * Returns metadata corresponding to this snapshot.
      *
-     * @param metaRequest Meta request.
+     * @return This snapshot metadata.
      */
-    SnapshotMetaResponse handleSnapshotMetaRequest(SnapshotMetaRequest 
metaRequest) {
-        //TODO https://issues.apache.org/jira/browse/IGNITE-17935
+    public SnapshotMeta meta() {
+        assert meta != null : "No snapshot meta yet, probably the snapshot 
scope was not yet frozen";
+
+        return meta;
+    }
+
+    /**
+     * Reads the snapshot meta and returns a future with the response.
+     *
+     * @param request Meta request.
+     */
+    @Nullable
+    SnapshotMetaResponse handleSnapshotMetaRequest(SnapshotMetaRequest 
request) {
+        if (closed) {
+            return logAlreadyClosedAndReturnNull();
+        }
+
+        assert meta != null : "No snapshot meta yet, probably the snapshot 
scope was not yet frozen";

Review Comment:
   Can't there be a race?



##########
modules/table/src/main/java/org/apache/ignite/internal/table/distributed/raft/snapshot/outgoing/OutgoingSnapshot.java:
##########
@@ -71,18 +76,23 @@ public class OutgoingSnapshot {
 
     private final ReusableLockLockup mvOperationsLockup = new 
ReusableLockLockup(mvOperationsLock);
 
+    /**
+     * Snapshot metadata taken on snapshot scope freezing.
+     */

Review Comment:
   ```suggestion
       /** Snapshot metadata taken on snapshot scope freezing. */
   ```



##########
modules/table/src/main/java/org/apache/ignite/internal/table/distributed/raft/snapshot/PartitionSnapshotStorageFactory.java:
##########
@@ -77,44 +73,34 @@ public PartitionSnapshotStorageFactory(
             TopologyService topologyService,
             OutgoingSnapshotsManager outgoingSnapshotsManager,
             PartitionAccess partition,
-            List<String> peers,
-            List<String> learners,
             Executor incomingSnapshotsExecutor
     ) {
         this.topologyService = topologyService;
         this.outgoingSnapshotsManager = outgoingSnapshotsManager;
         this.partition = partition;
-        this.peers = peers;
-        this.learners = learners;
         this.incomingSnapshotsExecutor = incomingSnapshotsExecutor;
 
         // We must choose the minimum applied index for local recovery so that 
we don't skip the raft commands for the storage with the
         // lowest applied index and thus no data loss occurs.
-        persistedRaftIndex = Math.min(
-                partition.mvPartitionStorage().persistedIndex(),
-                partition.txStatePartitionStorage().persistedIndex()
+        lastIncludedRaftIndex = Math.min(
+                partition.mvPartitionStorage().lastAppliedIndex(),
+                partition.txStatePartitionStorage().lastAppliedIndex()
         );
     }
 
     @Override
-    public PartitionSnapshotStorage createSnapshotStorage(String uri, 
RaftOptions raftOptions) {
-        SnapshotMeta snapshotMeta = new RaftMessagesFactory().snapshotMeta()
-                .lastIncludedIndex(persistedRaftIndex)
-                // According to the code of 
org.apache.ignite.raft.jraft.core.NodeImpl.bootstrap, it's "dangerous" to init 
term with a value
-                // greater than 1. 0 value of persisted index means that the 
underlying storage is empty.
-                .lastIncludedTerm(persistedRaftIndex > 0 ? 1 : 0)

Review Comment:
   I think we need to leave the definition of the term as it was done before, 
there is an explanation in the comment.
   
   As I understand it, then we do not need parameter `logManager`.



##########
modules/table/src/main/java/org/apache/ignite/internal/table/distributed/raft/snapshot/outgoing/OutgoingSnapshot.java:
##########
@@ -71,18 +76,23 @@ public class OutgoingSnapshot {
 
     private final ReusableLockLockup mvOperationsLockup = new 
ReusableLockLockup(mvOperationsLock);
 
+    /**
+     * Snapshot metadata taken on snapshot scope freezing.
+     */
+    private SnapshotMeta meta;
+
     /**
      * {@link RowId}s for which the corresponding rows were sent out of order 
(relative to the order in which this
      * snapshot sends rows), hence they must be skipped when sending rows 
normally.
      */
     private final Set<RowId> rowIdsToSkip = new ConcurrentHashSet<>();
 
-    // TODO: IGNITE-17935 - manage queue size
+    // TODO: IGNITE-18018 - manage queue size
     /**
      * Rows that need to be sent out of order (relative to the order in which 
this snapshot sends rows).
      * Versions inside rows are in oldest-to-newest order.
      */
-    private final Queue<SnapshotMvDataResponse.ResponseEntry> outOfOrderMvData 
= new LinkedList<>();
+    private final Queue<SnapshotMvDataResponse.ResponseEntry> outOfOrderMvData 
= new ArrayDeque<>();

Review Comment:
   I don't see where it is protected when calling method `java.util.Queue#poll`.



##########
modules/table/src/main/java/org/apache/ignite/internal/table/distributed/raft/snapshot/outgoing/OutgoingSnapshot.java:
##########
@@ -133,23 +146,57 @@ public PartitionKey partitionKey() {
     }
 
     /**
-     * Freezes the scope of this snapshot.
+     * Freezes the scope of this snapshot. This includes taking snapshot 
metadata and opening TX data cursor.
      *
      * <p>Must be called under snapshot lock.
      */
     void freezeScope() {
         assert mvOperationsLock.isLocked() : "MV operations lock must be 
acquired!";
 
+        meta = takeSnapshotMeta();
+
         txDataCursor = partition.txStatePartitionStorage().scan();
     }
 
+    private SnapshotMeta takeSnapshotMeta() {
+        long lastAppliedIndex = Math.max(
+                partition.mvPartitionStorage().lastAppliedIndex(),
+                partition.txStatePartitionStorage().lastAppliedIndex()
+        );
+
+        return SnapshotMetaUtils.snapshotMetaAt(lastAppliedIndex, logManager);
+    }
+
     /**
-     * Reads a snapshot meta and returns a future with the response.
+     * Returns metadata corresponding to this snapshot.
      *
-     * @param metaRequest Meta request.
+     * @return This snapshot metadata.
      */
-    SnapshotMetaResponse handleSnapshotMetaRequest(SnapshotMetaRequest 
metaRequest) {
-        //TODO https://issues.apache.org/jira/browse/IGNITE-17935
+    public SnapshotMeta meta() {
+        assert meta != null : "No snapshot meta yet, probably the snapshot 
scope was not yet frozen";

Review Comment:
   Can't there be a race?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to