timoninmaxim commented on code in PR #10506:
URL: https://github.com/apache/ignite/pull/10506#discussion_r1096540239
##########
modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/snapshot/SnapshotRestoreProcess.java:
##########
@@ -1227,6 +1273,379 @@ private void finishCacheStart(UUID reqId, Map<UUID,
Boolean> res, Map<UUID, Exce
rollbackRestoreProc.start(reqId, reqId);
}
+ /**
+ * @param reqId Request ID.
+ * @return Result future.
+ */
+ private IgniteInternalFuture<Boolean> cacheStop(UUID reqId) {
+ if (!U.isLocalNodeCoordinator(ctx.discovery()))
+ return new GridFinishedFuture<>();
+
+ SnapshotRestoreContext opCtx0 = opCtx;
+
+ Collection<String> stopCaches = opCtx0.cfgs.values()
+ .stream()
+ .map(c -> c.config().getName())
+ .collect(Collectors.toSet());
+
+ if (log.isInfoEnabled())
+ log.info("Stopping caches [reqId=" + opCtx0.reqId + ", snp=" +
opCtx0.snpName + ", caches=" + stopCaches + ']');
+
+ // Skip deleting cache files as they will be removed during rollback.
+ return ctx.cache().dynamicDestroyCaches(stopCaches, false, false)
+ .chain(fut -> {
+ if (fut.error() != null)
+ throw F.wrap(fut.error());
+ else
+ return true;
+ });
+ }
+
+ /**
+ * @param reqId Request ID.
+ * @param res Results.
+ * @param errs Errors.
+ */
+ private void finishCacheStop(UUID reqId, Map<UUID, Boolean> res, Map<UUID,
Exception> errs) {
+ if (ctx.clientNode())
+ return;
+
+ if (!errs.isEmpty()) {
+ SnapshotRestoreContext opCtx0 = opCtx;
+
+ log.error("Failed to stop caches during a snapshot rollback
routine " +
+ "[reqId=" + opCtx0.reqId + ", snapshot=" + opCtx0.snpName + ",
err=" + errs + ']');
+ }
+
+ if (U.isLocalNodeCoordinator(ctx.discovery()))
+ rollbackRestoreProc.start(reqId, reqId);
+ }
+
+ /**
+ * Inits restoring incremental snapshot.
+ *
+ * @param reqId Request ID.
+ * @return Result future.
+ */
+ private IgniteInternalFuture<Boolean> incrementalSnapshotRestore(UUID
reqId) {
+ if (ctx.clientNode())
+ return new GridFinishedFuture<>();
+
+ SnapshotRestoreContext opCtx0 = opCtx;
+
+ if (log.isInfoEnabled()) {
+ log.info("Starting incremental snapshot restore operation " +
+ "[reqId=" + opCtx0.reqId + ", snapshot=" + opCtx0.snpName + ",
incIdx=" + opCtx0.incIdx +
+ ", caches=" + F.viewReadOnly(opCtx0.cfgs, c ->
c.config().getName()) + ']');
+ }
+
+ GridFutureAdapter<Boolean> res = new GridFutureAdapter<>();
+
+ ctx.pools().getSnapshotExecutorService().submit(() -> {
+ try {
+ Set<Integer> cacheIds = opCtx0.cfgs.keySet();
+
+ walEnabled(false, cacheIds);
+
+ restoreIncrementalSnapshot(opCtx0.snpName, opCtx0.snpPath,
cacheIds, opCtx0.incIdx);
+
+ walEnabled(true, cacheIds);
+
+ CheckpointProgress cp = ctx.cache().context().database()
+ .forceNewCheckpoint("Incremental snapshot restored.",
(fut) -> {
+ if (fut.error() != null)
+ res.onDone(fut.error());
+ else
+ res.onDone(true);
+ });
+
+ if (cp == null)
+ res.onDone(new IgniteCheckedException("Node is
stopping."));
+ }
+ catch (Throwable e) {
+ res.onDone(e);
+ }
+ });
+
+ return res;
+ }
+
+ /**
+ * Restore incremental snapshot.
+ *
+ * @param snpName Base snapshot name.
+ * @param snpPath Base snapshot path.
+ * @param cacheIds Restoring cache IDs.
+ * @param incIdx Index of incremental snapshot.
+ */
+ private void restoreIncrementalSnapshot(
+ String snpName,
+ String snpPath,
+ Set<Integer> cacheIds,
+ int incIdx
+ ) throws IgniteCheckedException, IOException {
+ File[] segments = walSegments(snpName, snpPath, incIdx);
+
+ UUID incSnpId = ctx.cache().context().snapshotMgr()
+ .readIncrementalSnapshotMetadata(snpName, snpPath, incIdx)
+ .requestId();
+
+ IncrementalSnapshotFinishRecord incSnpFinRec =
readFinishRecord(segments[segments.length - 1], incSnpId);
+
+ if (incSnpFinRec == null)
+ throw new IgniteCheckedException("System WAL records for
incremental snapshot wasn't found [id=" + incSnpId + ']');
+
+ CacheStripedExecutor exec = new
CacheStripedExecutor(ctx.pools().getStripedExecutorService());
+
+ UUID prevIncSnpId = incIdx > 1
+ ?
ctx.cache().context().snapshotMgr().readIncrementalSnapshotMetadata(snpName,
snpPath, incIdx - 1).requestId()
+ : null;
+
+ long start = U.currentTimeMillis();
+
+ LongAdder applied = new LongAdder();
+
+ try (WALIterator it = walIter(log, segments)) {
+ IgnitePredicate<GridCacheVersion> txVerFilter = null;
+
+ // Skips applying WAL until base snapshot record.
+ while (it.hasNext()) {
+ WALRecord rec = it.next().getValue();
+
+ if (rec.type() == CLUSTER_SNAPSHOT) {
+ ClusterSnapshotRecord snpRec = (ClusterSnapshotRecord)rec;
+
+ if (snpRec.clusterSnapshotName().equals(snpName)) {
+ if (prevIncSnpId == null)
+ txVerFilter = txVer ->
!incSnpFinRec.excluded().contains(txVer);
+
+ break;
+ }
+ }
+ }
+
+ // Apply incremental snapshots.
+ while (it.hasNext()) {
+ WALRecord rec = it.next().getValue();
+
+ if (rec.type() == INCREMENTAL_SNAPSHOT_START_RECORD) {
+ IncrementalSnapshotStartRecord startRec =
(IncrementalSnapshotStartRecord)rec;
+
+ if (startRec.id().equals(incSnpFinRec.id()))
+ txVerFilter = v -> incSnpFinRec.included().contains(v);
+ }
+ else if (rec.type() == INCREMENTAL_SNAPSHOT_FINISH_RECORD) {
+ IncrementalSnapshotFinishRecord finRec =
(IncrementalSnapshotFinishRecord)rec;
+
+ if (finRec.id().equals(prevIncSnpId))
+ txVerFilter = txVer ->
!incSnpFinRec.excluded().contains(txVer);
+ }
+ else if (rec.type() == DATA_RECORD_V2) {
+ DataRecord data = (DataRecord)rec;
+
+ DataEntry entry = data.writeEntries().get(0);
+
+ if (txVerFilter == null ||
txVerFilter.apply(entry.nearXidVersion())) {
+ for (DataEntry e: data.writeEntries()) {
+ // That is OK to restore only part of transaction
related to a specified cache group,
+ // because a full snapshot restoring does the same.
+ if (!cacheIds.contains(e.cacheId()))
Review Comment:
`txVerFilter` is `null` while intermediate incremental snapshots applies.
For example, user ask to restore incIdx=3. Then Ignite doesn't filter entries
from incremental snapshots 1 and 2.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]