nizhikov commented on code in PR #10506:
URL: https://github.com/apache/ignite/pull/10506#discussion_r1095854564


##########
modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/snapshot/SnapshotRestoreProcess.java:
##########
@@ -1227,6 +1273,379 @@ private void finishCacheStart(UUID reqId, Map<UUID, 
Boolean> res, Map<UUID, Exce
             rollbackRestoreProc.start(reqId, reqId);
     }
 
+    /**
+     * @param reqId Request ID.
+     * @return Result future.
+     */
+    private IgniteInternalFuture<Boolean> cacheStop(UUID reqId) {
+        if (!U.isLocalNodeCoordinator(ctx.discovery()))
+            return new GridFinishedFuture<>();
+
+        SnapshotRestoreContext opCtx0 = opCtx;
+
+        Collection<String> stopCaches = opCtx0.cfgs.values()
+            .stream()
+            .map(c -> c.config().getName())
+            .collect(Collectors.toSet());
+
+        if (log.isInfoEnabled())
+            log.info("Stopping caches [reqId=" + opCtx0.reqId + ", snp=" + 
opCtx0.snpName + ", caches=" + stopCaches + ']');
+
+        // Skip deleting cache files as they will be removed during rollback.
+        return ctx.cache().dynamicDestroyCaches(stopCaches, false, false)
+            .chain(fut -> {
+                if (fut.error() != null)
+                    throw F.wrap(fut.error());
+                else
+                    return true;
+            });
+    }
+
+    /**
+     * @param reqId Request ID.
+     * @param res Results.
+     * @param errs Errors.
+     */
+    private void finishCacheStop(UUID reqId, Map<UUID, Boolean> res, Map<UUID, 
Exception> errs) {
+        if (ctx.clientNode())
+            return;
+
+        if (!errs.isEmpty()) {
+            SnapshotRestoreContext opCtx0 = opCtx;
+
+            log.error("Failed to stop caches during a snapshot rollback 
routine " +
+                "[reqId=" + opCtx0.reqId + ", snapshot=" + opCtx0.snpName + ", 
err=" + errs + ']');
+        }
+
+        if (U.isLocalNodeCoordinator(ctx.discovery()))
+            rollbackRestoreProc.start(reqId, reqId);
+    }
+
+    /**
+     * Inits restoring incremental snapshot.
+     *
+     * @param reqId Request ID.
+     * @return Result future.
+     */
+    private IgniteInternalFuture<Boolean> incrementalSnapshotRestore(UUID 
reqId) {
+        if (ctx.clientNode())
+            return new GridFinishedFuture<>();
+
+        SnapshotRestoreContext opCtx0 = opCtx;
+
+        if (log.isInfoEnabled()) {
+            log.info("Starting incremental snapshot restore operation " +
+                "[reqId=" + opCtx0.reqId + ", snapshot=" + opCtx0.snpName + ", 
incIdx=" + opCtx0.incIdx +
+                ", caches=" + F.viewReadOnly(opCtx0.cfgs, c -> 
c.config().getName()) + ']');
+        }
+
+        GridFutureAdapter<Boolean> res = new GridFutureAdapter<>();
+
+        ctx.pools().getSnapshotExecutorService().submit(() -> {
+            try {
+                Set<Integer> cacheIds = opCtx0.cfgs.keySet();
+
+                walEnabled(false, cacheIds);
+
+                restoreIncrementalSnapshot(opCtx0.snpName, opCtx0.snpPath, 
cacheIds, opCtx0.incIdx);
+
+                walEnabled(true, cacheIds);
+
+                CheckpointProgress cp = ctx.cache().context().database()
+                    .forceNewCheckpoint("Incremental snapshot restored.", 
(fut) -> {
+                        if (fut.error() != null)
+                            res.onDone(fut.error());
+                        else
+                            res.onDone(true);
+                    });
+
+                if (cp == null)
+                    res.onDone(new IgniteCheckedException("Node is 
stopping."));
+            }
+            catch (Throwable e) {
+                res.onDone(e);
+            }
+        });
+
+        return res;
+    }
+
+    /**
+     * Restore incremental snapshot.
+     *
+     * @param snpName Base snapshot name.
+     * @param snpPath Base snapshot path.
+     * @param cacheIds Restoring cache IDs.
+     * @param incIdx Index of incremental snapshot.
+     */
+    private void restoreIncrementalSnapshot(
+        String snpName,
+        String snpPath,
+        Set<Integer> cacheIds,
+        int incIdx
+    ) throws IgniteCheckedException, IOException {
+        File[] segments = walSegments(snpName, snpPath, incIdx);
+
+        UUID incSnpId = ctx.cache().context().snapshotMgr()
+            .readIncrementalSnapshotMetadata(snpName, snpPath, incIdx)
+            .requestId();
+
+        IncrementalSnapshotFinishRecord incSnpFinRec = 
readFinishRecord(segments[segments.length - 1], incSnpId);
+
+        if (incSnpFinRec == null)
+            throw new IgniteCheckedException("System WAL records for 
incremental snapshot wasn't found [id=" + incSnpId + ']');
+
+        CacheStripedExecutor exec = new 
CacheStripedExecutor(ctx.pools().getStripedExecutorService());
+
+        UUID prevIncSnpId = incIdx > 1
+            ? 
ctx.cache().context().snapshotMgr().readIncrementalSnapshotMetadata(snpName, 
snpPath, incIdx - 1).requestId()
+            : null;
+
+        long start = U.currentTimeMillis();
+
+        LongAdder applied = new LongAdder();
+
+        try (WALIterator it = walIter(log, segments)) {
+            IgnitePredicate<GridCacheVersion> txVerFilter = null;
+
+            // Skips applying WAL until base snapshot record.
+            while (it.hasNext()) {
+                WALRecord rec = it.next().getValue();
+
+                if (rec.type() == CLUSTER_SNAPSHOT) {
+                    ClusterSnapshotRecord snpRec = (ClusterSnapshotRecord)rec;
+
+                    if (snpRec.clusterSnapshotName().equals(snpName)) {
+                        if (prevIncSnpId == null)
+                            txVerFilter = txVer -> 
!incSnpFinRec.excluded().contains(txVer);
+
+                        break;
+                    }
+                }
+            }
+
+            // Apply incremental snapshots.
+            while (it.hasNext()) {
+                WALRecord rec = it.next().getValue();
+
+                if (rec.type() == INCREMENTAL_SNAPSHOT_START_RECORD) {
+                    IncrementalSnapshotStartRecord startRec = 
(IncrementalSnapshotStartRecord)rec;
+
+                    if (startRec.id().equals(incSnpFinRec.id()))
+                        txVerFilter = v -> incSnpFinRec.included().contains(v);
+                }
+                else if (rec.type() == INCREMENTAL_SNAPSHOT_FINISH_RECORD) {
+                    IncrementalSnapshotFinishRecord finRec = 
(IncrementalSnapshotFinishRecord)rec;
+
+                    if (finRec.id().equals(prevIncSnpId))
+                        txVerFilter = txVer -> 
!incSnpFinRec.excluded().contains(txVer);
+                }
+                else if (rec.type() == DATA_RECORD_V2) {
+                    DataRecord data = (DataRecord)rec;
+
+                    DataEntry entry = data.writeEntries().get(0);

Review Comment:
   Do we really can filter ALL `DataEntry` based on first item?
   Even if this works for now, let's filter `DataEntry` one by one.
   It will no hurt performance (AFAIK multi `DataEntry` records exists only on 
backup nodes when transaction committed), because, `DataRecord` in memory, 
already.
   
   But it will keep logic correct even after future changes



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to