timoninmaxim commented on code in PR #10263:
URL: https://github.com/apache/ignite/pull/10263#discussion_r989753694
##########
modules/core/src/test/java/org/apache/ignite/internal/processors/cache/persistence/snapshot/IgniteClusterSnapshotSelfTest.java:
##########
@@ -527,21 +527,31 @@ public void testBltChangeDuringClusterSnapshot() throws
Exception {
/** @throws Exception If fails. */
@Test
public void testClusterSnapshotExOnInitiatorLeft() throws Exception {
- IgniteEx ignite = startGridsWithCache(2, dfltCacheCfg,
CACHE_KEYS_RANGE);
+ for (boolean inc : new boolean[] {false, true}) {
Review Comment:
I'd suggest to split it on 2 independent tests, because test for inc
snapshots requires additional `if` and additional cleaning that is already
implemented for `AbstractSnapshotSelfTest`.
##########
modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridLocalConfigManager.java:
##########
@@ -205,10 +204,16 @@ public static StoredCacheData readCacheData(
Marshaller marshaller,
IgniteConfiguration cfg
) throws IgniteCheckedException {
- try (InputStream stream = new BufferedInputStream(new
FileInputStream(conf))) {
+ try (InputStream stream = new
BufferedInputStream(Files.newInputStream(conf.toPath()))) {
+ if (marshaller == null || cfg == null) {
Review Comment:
Now `marshaller` and `cfg` became nullable. Let's add `@Nullable` annotation
to method params, with describing when they are can be `null`.
##########
modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/snapshot/IgniteSnapshotManager.java:
##########
@@ -308,6 +309,15 @@ public class IgniteSnapshotManager extends
GridCacheSharedManagerAdapter
/** Total snapshot files count which receiver should expect to receive. */
private static final String SNP_PARTITIONS_CNT = "partsCnt";
+ /** Incremental snapshots directory name. */
+ public static final String INC_SNP_DIR = "increments";
+
+ /** Count of numbers in incremental snapshot directory. */
+ public static final int INC_DIR_LENGTH = 4;
Review Comment:
I'd suggest to increase this number, making this similar to WAL segments
file name. We might not understand fully the pattern of using increments. 4
maybe looks OK, but also why not 3 or 5? I suppose there is no need to limit it
at all - let's use `long` possible values.
##########
modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/snapshot/IgniteSnapshotManager.java:
##########
@@ -773,6 +790,108 @@ private IgniteInternalFuture<SnapshotOperationResponse>
initLocalSnapshotStartSt
"on the local node [missed=" + leftGrps + ", nodeId=" +
cctx.localNodeId() + ']'));
}
+ if (req.incremental()) {
+ try {
+ checkIncrementalCanBeCreated(req.snapshotName(),
req.snapshotPath());
+ }
+ catch (IgniteCheckedException e) {
+ return new GridFinishedFuture<>(e);
+ }
+
+ return initLocalIncrementalSnapshot(req);
+ }
+ else
+ return initLocalFullSnapshot(req, grpIds, withMetaStorage);
+ }
+
+ /**
+ * @param req Request on snapshot creation.
+ * @return Future which will be completed when a snapshot has been started.
+ */
+ private IgniteInternalFuture<SnapshotOperationResponse>
initLocalIncrementalSnapshot(SnapshotOperationRequest req) {
+ SnapshotMetadata meta = readSnapshotMetadata(new File(
Review Comment:
It's already read in `checkIncrementalCanBeCreated`, let's reuse it?
##########
modules/core/src/test/java/org/apache/ignite/internal/processors/cache/persistence/snapshot/IncrementalSnapshotTest.java:
##########
@@ -0,0 +1,202 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.processors.cache.persistence.snapshot;
+
+import java.util.function.UnaryOperator;
+import org.apache.ignite.IgniteException;
+import org.apache.ignite.configuration.CacheConfiguration;
+import org.apache.ignite.configuration.IgniteConfiguration;
+import org.apache.ignite.internal.IgniteEx;
+import org.apache.ignite.internal.util.typedef.internal.CU;
+import org.junit.Test;
+
+import static org.apache.ignite.testframework.GridTestUtils.assertThrows;
+import static
org.apache.ignite.testframework.GridTestUtils.assertThrowsWithCause;
+
+/**
+ * Basic tests for incremental snapshots.
+ */
+public class IncrementalSnapshotTest extends AbstractSnapshotSelfTest {
+ /** */
+ public static final int GRID_CND = 3;
Review Comment:
s/CND/CNT/
##########
modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/snapshot/IgniteSnapshotManager.java:
##########
@@ -704,6 +689,38 @@ public File snapshotLocalDir(String snpName, @Nullable
String snpPath) {
return snpPath == null ? new File(locSnpDir, snpName) : new
File(snpPath, snpName);
}
+ /**
+ * Returns path to specific incremental snapshot.
+ * For example, {@code "work/snapshots/mybackup/increments/node01/0001"}.
+ *
+ * @param snpName Snapshot name.
+ * @param snpPath Snapshot directory path.
+ * @param incIdx Increment index.
+ * @return Local snapshot directory where snapshot files are located.
+ */
+ public File incrementalSnapshotLocalDir(String snpName, @Nullable String
snpPath, int incIdx) {
+ return Paths.get(
+ incrementalSnapshotsLocalRootDir(snpName,
snpPath).getAbsolutePath(),
+ IgniteUtils.fixedLengthNumberName(incIdx, INC_DIR_LENGTH, null)
+ ).toFile();
+ }
+
+ /**
+ * Returns root folder for incremental snapshot.
+ * For example, {@code "work/snapshots/mybackup/increments/node01"}.
+ *
+ * @param snpName Snapshot name.
+ * @param snpPath Snapshot directory path.
+ * @return Local snapshot directory where snapshot files are located.
+ */
+ public File incrementalSnapshotsLocalRootDir(String snpName, @Nullable
String snpPath) {
Review Comment:
The same, let's use already resolved `snpPath` as single param.
##########
modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridLocalConfigManager.java:
##########
@@ -205,10 +204,16 @@ public static StoredCacheData readCacheData(
Marshaller marshaller,
IgniteConfiguration cfg
) throws IgniteCheckedException {
- try (InputStream stream = new BufferedInputStream(new
FileInputStream(conf))) {
+ try (InputStream stream = new
BufferedInputStream(Files.newInputStream(conf.toPath()))) {
+ if (marshaller == null || cfg == null) {
Review Comment:
Actually I see that they can be `null` only for `IndexReader`. Why this
change is required for this PR?
##########
modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/snapshot/IgniteSnapshotManager.java:
##########
@@ -1109,20 +1243,49 @@ public Set<UUID> cacheStartRequiredAliveNodes(@Nullable
IgniteUuid restoreId) {
/**
* @return List of all known snapshots on the local node.
*/
- public List<String> localSnapshotNames() {
+ public List<String> localSnapshotNames(@Nullable String snpPath) {
Review Comment:
Maybe it's better to make `localSnapshotNames` static and provide `snpPath`
as already resolved param.
##########
modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/snapshot/IgniteSnapshotManager.java:
##########
@@ -2178,6 +2404,118 @@ private IgniteFuture<Boolean>
executeRestoreManagementTask(
return new
IgniteFutureImpl<>(cctx.kernalContext().task().execute(taskCls, snpName));
}
+ /**
+ * Checks that incremental snapshot can be created for given full snapshot
and current cluster state.
+ *
+ * @param name Full snapshot name.
+ * @param snpPath Snapshot path.
+ */
+ private void checkIncrementalCanBeCreated(String name, @Nullable String
snpPath) throws IgniteCheckedException {
+ File snpDir = snapshotLocalDir(name, snpPath);
+
+ SnapshotMetadata meta = readSnapshotMetadata(new File(
+ snpDir,
+ snapshotMetaFileName(cctx.localNode().consistentId().toString())
+ ));
+
+ Set<String> aliveNodesConsIds = cctx.discovery().aliveServerNodes()
+ .stream()
+ .map(node -> node.consistentId().toString())
+ .collect(Collectors.toSet());
+
+ for (String consId : meta.baselineNodes()) {
+ if (!aliveNodesConsIds.contains(consId)) {
+ throw new IgniteCheckedException("Create incremental snapshot
request has been rejected. " +
+ "Node from full snapshot offline [consistentId=" + consId
+ ']');
+ }
+ }
+
+ File rootSnpCachesDir = new File(snpDir,
databaseRelativePath(meta.folderName()));
+
+ for (int grpId : meta.cacheGroupIds()) {
+ if (grpId == METASTORAGE_CACHE_ID)
+ continue;
+
+ CacheGroupContext gctx =
cctx.kernalContext().cache().cacheGroup(grpId);
+
+ if (gctx == null) {
+ throw new IgniteCheckedException("Create incremental snapshot
request has been rejected. " +
+ "Cache group destroyed [groupId=" + grpId + ']');
+ }
+
+ List<File> snpCacheDir =
+ cacheDirectories(rootSnpCachesDir, grpName ->
gctx.cacheOrGroupName().equals(grpName));
+
+ if (snpCacheDir.isEmpty()) {
+ throw new IgniteCheckedException("Create incremental snapshot
request has been rejected. " +
+ "Cache group directory not found [groupId=" + grpId + ']');
+ }
+
+ assert snpCacheDir.size() == 1 : "Single snapshot cache directory
must be found";
+
+ JdkMarshaller marsh =
MarshallerUtils.jdkMarshaller(cctx.kernalContext().igniteInstanceName());
+
+ for (File snpDataFile :
FilePageStoreManager.cacheDataFiles(snpCacheDir.get(0))) {
+ StoredCacheData snpCacheData =
GridLocalConfigManager.readCacheData(
+ snpDataFile,
+ marsh,
+ cctx.kernalContext().config()
+ );
+
+ File nodeDataFile = new
File(snpDataFile.getAbsolutePath().replace(
+ rootSnpCachesDir.getAbsolutePath(),
+ pdsSettings.persistentStoreNodePath().getAbsolutePath()
+ ));
+
+ if (!nodeDataFile.exists()) {
+ throw new IgniteCheckedException("Create incremental
snapshot request has been rejected. " +
+ "Cache destroyed [cacheId=" + snpCacheData.cacheId() +
+ ", cacheName=" + snpCacheData.config().getName() +
']');
+ }
+
+ StoredCacheData nodeCacheData =
GridLocalConfigManager.readCacheData(
+ nodeDataFile,
+ marsh,
+ cctx.gridConfig()
+ );
+
+ addEncryptionInfoIfEnabled(nodeCacheData);
Review Comment:
Let's raise an exception if encryption is enabled, and forbid incremental
snapshots.
##########
modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/snapshot/IgniteSnapshotManager.java:
##########
@@ -1109,20 +1243,49 @@ public Set<UUID> cacheStartRequiredAliveNodes(@Nullable
IgniteUuid restoreId) {
/**
* @return List of all known snapshots on the local node.
*/
- public List<String> localSnapshotNames() {
+ public List<String> localSnapshotNames(@Nullable String snpPath) {
if (cctx.kernalContext().clientNode())
throw new UnsupportedOperationException("Client and daemon nodes
can not perform this operation.");
if (locSnpDir == null)
return Collections.emptyList();
synchronized (snpOpMux) {
- return Arrays.stream(locSnpDir.listFiles(File::isDirectory))
+ File[] dirs = (snpPath == null ? locSnpDir : new
File(snpPath)).listFiles(File::isDirectory);
+
+ if (dirs == null)
+ return Collections.emptyList();
+
+ return Arrays.stream(dirs)
.map(File::getName)
.collect(Collectors.toList());
}
}
+ /**
+ * @param snpName Full snapshot name.
+ * @param snpPath Snapshot path.
+ * @return Maximum existing incremental snapshot index.
+ */
+ private int maxLocalIncrementSnapshot(String snpName, @Nullable String
snpPath) {
Review Comment:
Let's resolve and validate `snpPath` before invoking the method (actually
it's already checked in `createSnapshot`). Then this method should receive
single param `snpPath` only.
##########
modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/snapshot/IgniteSnapshotManager.java:
##########
@@ -1678,7 +1872,10 @@ public IgniteFutureImpl<Void> restoreSnapshot(String
name, @Nullable String snpP
for (File tmp : snapshotTmpDir().listFiles())
U.delete(tmp);
- deleteSnapshot(snpDir, pdsSettings.folderName());
+ if (INC_SNP_NAME_PATTERN.matcher(snpDir.getName()).matches())
Review Comment:
Looks unreliable, as user might create an ordinary snapshot with a such
name. Let's check directory name also (it must contain "increments")
##########
modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridLocalConfigManager.java:
##########
@@ -355,9 +360,18 @@ public CacheJoinNodeDiscoveryData
restoreCacheConfigurations() throws IgniteChec
* @param lsnr Instance of listener to add.
*/
public void addConfigurationChangeListener(BiConsumer<String, File> lsnr) {
- assert chgLock.isWriteLockedByCurrentThread();
+ if (chgLock.isWriteLockedByCurrentThread())
Review Comment:
AFAIU, it's safe to acquire a write lock even current thread already holds
it. Do you check this condition for perfomance reasons?
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]