This is an automated email from the ASF dual-hosted git repository.
namelchev pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/ignite.git
The following commit(s) were added to refs/heads/master by this push:
new 78a3a9b7f2a IGNITE-27566 Refactor SnapshotOperationRequest class to be
stateless for serialization framework (#12647)
78a3a9b7f2a is described below
commit 78a3a9b7f2a95481556b0041c8e540a882664e77
Author: Nikita Amelchev <[email protected]>
AuthorDate: Fri Jan 23 08:37:24 2026 +0300
IGNITE-27566 Refactor SnapshotOperationRequest class to be stateless for
serialization framework (#12647)
---
.../snapshot/AbstractSnapshotOperationRequest.java | 2 -
.../snapshot/IgniteSnapshotManager.java | 216 ++++++++++++---------
.../snapshot/SnapshotCheckProcessRequest.java | 4 +-
.../persistence/snapshot/SnapshotOperation.java | 115 +++++++++--
.../snapshot/SnapshotOperationEndRequest.java | 78 ++++++++
.../snapshot/SnapshotOperationRequest.java | 120 +-----------
.../main/resources/META-INF/classnames.properties | 1 -
7 files changed, 303 insertions(+), 233 deletions(-)
diff --git
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/snapshot/AbstractSnapshotOperationRequest.java
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/snapshot/AbstractSnapshotOperationRequest.java
index 2e0a5ce64a3..3a161664a54 100644
---
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/snapshot/AbstractSnapshotOperationRequest.java
+++
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/snapshot/AbstractSnapshotOperationRequest.java
@@ -63,7 +63,6 @@ abstract class AbstractSnapshotOperationRequest implements
Serializable {
* @param snpName Snapshot name.
* @param snpPath Snapshot directory path.
* @param grps Collection of cache group names.
- * @param incIdx Incremental snapshot index.
* @param nodes IDs of the nodes that must be alive to complete the
operation.
*/
protected AbstractSnapshotOperationRequest(
@@ -71,7 +70,6 @@ abstract class AbstractSnapshotOperationRequest implements
Serializable {
String snpName,
String snpPath,
@Nullable Collection<String> grps,
- int incIdx,
Collection<UUID> nodes
) {
this.reqId = reqId;
diff --git
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/snapshot/IgniteSnapshotManager.java
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/snapshot/IgniteSnapshotManager.java
index e1c0cc4e30f..25624fe878c 100644
---
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/snapshot/IgniteSnapshotManager.java
+++
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/snapshot/IgniteSnapshotManager.java
@@ -344,7 +344,7 @@ public class IgniteSnapshotManager extends
GridCacheSharedManagerAdapter
private final SnapshotCheckProcess checkSnpProc;
/** Check previously performed snapshot operation and delete uncompleted
files if we need. */
- private final DistributedProcess<SnapshotOperationRequest,
SnapshotOperationResponse> endSnpProc;
+ private final DistributedProcess<SnapshotOperationEndRequest,
SnapshotOperationResponse> endSnpProc;
/** Marshaller. */
private final Marshaller marsh;
@@ -386,7 +386,7 @@ public class IgniteSnapshotManager extends
GridCacheSharedManagerAdapter
private ClusterSnapshotFuture clusterSnpFut;
/** Current snapshot operation on local node. */
- private volatile SnapshotOperationRequest clusterSnpReq;
+ private volatile SnapshotOperation curSnpOp;
/** {@code true} if recovery process occurred for snapshot. */
private volatile boolean recovered;
@@ -569,21 +569,24 @@ public class IgniteSnapshotManager extends
GridCacheSharedManagerAdapter
UUID leftNodeId = evt.eventNode().id();
if (evt.type() == EVT_NODE_LEFT || evt.type() ==
EVT_NODE_FAILED) {
- SnapshotOperationRequest snpReq = clusterSnpReq;
+ SnapshotOperation snpOp = curSnpOp;
String err = "Snapshot operation interrupted, because
baseline node left the cluster: " + leftNodeId;
- boolean reqNodeLeft = snpReq != null &&
snpReq.nodes().contains(leftNodeId);
+ boolean reqNodeLeft = snpOp != null &&
snpOp.request().nodes().contains(leftNodeId);
// If the coordinator left the cluster and did not start
// the final snapshot phase (SNAPSHOT_END), we start it
from a new one.
- if (reqNodeLeft && snpReq.startStageEnded() &&
U.isLocalNodeCoordinator(ctx.discovery())) {
- snpReq.error(new ClusterTopologyCheckedException(err));
+ if (reqNodeLeft && snpOp.startStageEnded() &&
U.isLocalNodeCoordinator(ctx.discovery())) {
+ snpOp.error(new ClusterTopologyCheckedException(err));
- endSnpProc.start(snpReq.requestId(), snpReq);
+ endSnpProc.start(
+ snpOp.request().requestId(),
+ new
SnapshotOperationEndRequest(snpOp.request().requestId(), snpOp.error(), null)
+ );
}
for (AbstractSnapshotFutureTask<?> sctx :
locSnpTasks.values()) {
if (sctx.sourceNodeId().equals(leftNodeId) ||
- (reqNodeLeft &&
snpReq.snapshotName().equals(sctx.snapshotName())))
+ (reqNodeLeft &&
snpOp.request().snapshotName().equals(sctx.snapshotName())))
sctx.acceptException(new
ClusterTopologyCheckedException(err));
}
@@ -764,14 +767,15 @@ public class IgniteSnapshotManager extends
GridCacheSharedManagerAdapter
private IgniteInternalFuture<SnapshotOperationResponse>
initLocalSnapshotStartStage(SnapshotOperationRequest req) {
// Executed inside discovery notifier thread, prior to firing
discovery custom event,
// so it is safe to set new snapshot task inside this method without
synchronization.
- if (clusterSnpReq != null) {
+ if (curSnpOp != null) {
return new GridFinishedFuture<>(new
IgniteCheckedException("Snapshot operation has been rejected. " +
- "Another snapshot operation in progress [req=" + req + ",
curr=" + clusterSnpReq + ']'));
+ "Another snapshot operation in progress [req=" + req + ",
curr=" + curSnpOp + ']'));
}
- req.snapshotFileTree(new SnapshotFileTree(cctx.kernalContext(),
req.snapshotName(), req.snapshotPath()));
+ SnapshotOperation snpOp = new SnapshotOperation(req,
+ new SnapshotFileTree(cctx.kernalContext(), req.snapshotName(),
req.snapshotPath()));
- clusterSnpReq = req;
+ curSnpOp = snpOp;
if (req.incremental())
handleIncrementalSnapshotId(req.requestId(),
cctx.discovery().topologyVersion());
@@ -819,18 +823,18 @@ public class IgniteSnapshotManager extends
GridCacheSharedManagerAdapter
SnapshotMetadata meta;
try {
- meta = readSnapshotMetadata(req.snapshotFileTree().meta());
+ meta = readSnapshotMetadata(snpOp.snapshotFileTree().meta());
- checkIncrementalCanBeCreated(req.snapshotFileTree(), meta);
+ checkIncrementalCanBeCreated(snpOp.snapshotFileTree(), meta);
}
catch (IgniteCheckedException | IOException e) {
return new GridFinishedFuture<>(e);
}
- return initLocalIncrementalSnapshot(req, meta);
+ return initLocalIncrementalSnapshot(snpOp, meta);
}
else
- return initLocalFullSnapshot(req, grpIds, comprGrpIds,
withMetaStorage);
+ return initLocalFullSnapshot(snpOp, grpIds, comprGrpIds,
withMetaStorage);
}
/**
@@ -868,15 +872,16 @@ public class IgniteSnapshotManager extends
GridCacheSharedManagerAdapter
}
/**
- * @param req Request on snapshot creation.
+ * @param snpOp Snapshot creation operation.
* @param meta Full snapshot metadata.
* @return Future which will be completed when a snapshot has been started.
*/
private IgniteInternalFuture<SnapshotOperationResponse>
initLocalIncrementalSnapshot(
- SnapshotOperationRequest req,
+ SnapshotOperation snpOp,
SnapshotMetadata meta
) {
- SnapshotFileTree sft = req.snapshotFileTree();
+ SnapshotOperationRequest req = snpOp.request();
+ SnapshotFileTree sft = snpOp.snapshotFileTree();
IncrementalSnapshotFileTree ift =
sft.incrementalSnapshotFileTree(req.incrementIndex());
WALPointer lowPtr;
@@ -917,7 +922,7 @@ public class IgniteSnapshotManager extends
GridCacheSharedManagerAdapter
req.incrementIndex(),
cctx.localNode().consistentId().toString(),
ft.folderName(),
- clusterSnpReq.startTime(),
+ req.startTime(),
markWalFut.result()
);
@@ -935,9 +940,7 @@ public class IgniteSnapshotManager extends
GridCacheSharedManagerAdapter
}
cctx.kernalContext().pools().getSnapshotExecutorService().submit(() ->
{
- SnapshotOperationRequest snpReq = clusterSnpReq;
-
- AbstractSnapshotFutureTask<?> task =
locSnpTasks.get(snpReq.snapshotName());
+ AbstractSnapshotFutureTask<?> task =
locSnpTasks.get(req.snapshotName());
if (task == null)
return;
@@ -964,18 +967,20 @@ public class IgniteSnapshotManager extends
GridCacheSharedManagerAdapter
}
/**
- * @param req Request
+ * @param snpOp Snapshot operation.
* @param grpIds Groups.
* @param comprGrpIds Compressed Groups.
* @param withMetaStorage Flag to include metastorage.
* @return Create snapshot future.
*/
private IgniteInternalFuture<SnapshotOperationResponse>
initLocalFullSnapshot(
- SnapshotOperationRequest req,
+ SnapshotOperation snpOp,
List<Integer> grpIds,
Collection<Integer> comprGrpIds,
boolean withMetaStorage
) {
+ SnapshotOperationRequest req = snpOp.request();
+
if (!isPersistenceEnabled(cctx.gridConfig()) && req.snapshotPath() ==
null)
ft.mkdirSnapshotsRoot();
@@ -1010,7 +1015,7 @@ public class IgniteSnapshotManager extends
GridCacheSharedManagerAdapter
}
IgniteInternalFuture<?> task0 = registerSnapshotTask(
- req.snapshotFileTree(),
+ snpOp.snapshotFileTree(),
req.operationalNodeId(),
req.requestId(),
parts,
@@ -1019,7 +1024,7 @@ public class IgniteSnapshotManager extends
GridCacheSharedManagerAdapter
req.compress(),
req.encrypt(),
req.configOnly(),
- locSndrFactory.apply(req.snapshotFileTree())
+ locSndrFactory.apply(snpOp.snapshotFileTree())
);
if (withMetaStorage) {
@@ -1038,7 +1043,7 @@ public class IgniteSnapshotManager extends
GridCacheSharedManagerAdapter
.map(n ->
cctx.discovery().node(n).consistentId().toString())
.collect(Collectors.toSet());
- req.snapshotFileTree().root().mkdirs();
+ snpOp.snapshotFileTree().root().mkdirs();
SnapshotFutureTaskResult res =
(SnapshotFutureTaskResult)task0.result();
@@ -1053,7 +1058,7 @@ public class IgniteSnapshotManager extends
GridCacheSharedManagerAdapter
req.compress(),
cctx.gridConfig().getDataStorageConfiguration().getPageSize(),
grpIds,
- clusterSnpReq.startTime(),
+ req.startTime(),
comprGrpIds,
blts,
res.parts(),
@@ -1064,14 +1069,14 @@ public class IgniteSnapshotManager extends
GridCacheSharedManagerAdapter
encKey == null ? null : encSpi.encryptKey(encKey)
);
- SnapshotHandlerContext ctx = new SnapshotHandlerContext(meta,
req.groups(), cctx.localNode(), req.snapshotFileTree(),
- req.streamerWarning(), true);
+ SnapshotHandlerContext ctx = new SnapshotHandlerContext(meta,
req.groups(), cctx.localNode(), snpOp.snapshotFileTree(),
+ snpOp.streamerWarning(), true);
- req.meta(meta);
+ snpOp.meta(meta);
- storeSnapshotMeta(req.meta(), req.snapshotFileTree().meta());
+ storeSnapshotMeta(snpOp.meta(),
snpOp.snapshotFileTree().meta());
- log.info("Snapshot metafile has been created: " +
req.snapshotFileTree().meta().getAbsolutePath());
+ log.info("Snapshot metafile has been created: " +
snpOp.snapshotFileTree().meta().getAbsolutePath());
return new
SnapshotOperationResponse(handlers.invokeAll(SnapshotHandlerType.CREATE, ctx));
}
@@ -1087,7 +1092,8 @@ public class IgniteSnapshotManager extends
GridCacheSharedManagerAdapter
* @param err Errors.
*/
private void processLocalSnapshotStartStageResult(UUID id, Map<UUID,
SnapshotOperationResponse> res, Map<UUID, Throwable> err) {
- SnapshotOperationRequest snpReq = clusterSnpReq;
+ SnapshotOperation snpOp = curSnpOp;
+ SnapshotOperationRequest snpReq = snpOp == null ? null :
snpOp.request();
if (snpReq != null && Objects.equals(id, snpReq.requestId()) &&
snpReq.incremental()) {
cctx.tm().txMessageTransformer(null);
@@ -1126,7 +1132,7 @@ public class IgniteSnapshotManager extends
GridCacheSharedManagerAdapter
}
}
- snpReq.startStageEnded(true);
+ snpOp.startStageEnded(true);
if (isLocalNodeCoordinator(cctx.discovery())) {
Set<UUID> missed = new HashSet<>(snpReq.nodes());
@@ -1134,24 +1140,27 @@ public class IgniteSnapshotManager extends
GridCacheSharedManagerAdapter
missed.removeAll(err.keySet());
if (cancelled) {
- snpReq.error(new
IgniteFutureCancelledCheckedException("Execution of snapshot tasks " +
+ snpOp.error(new
IgniteFutureCancelledCheckedException("Execution of snapshot tasks " +
"has been cancelled by external process [err=" + err + ",
missed=" + missed + ']'));
}
else if (!missed.isEmpty()) {
- snpReq.error(new ClusterTopologyCheckedException("Snapshot
operation interrupted, because baseline " +
+ snpOp.error(new ClusterTopologyCheckedException("Snapshot
operation interrupted, because baseline " +
"node left the cluster. Uncompleted snapshot will be
deleted [missed=" + missed + ']'));
}
else if (!F.isEmpty(err)) {
- snpReq.error(new IgniteCheckedException("Execution of local
snapshot tasks fails. " +
+ snpOp.error(new IgniteCheckedException("Execution of local
snapshot tasks fails. " +
"Uncompleted snapshot will be deleted [err=" + err + ']'));
}
- completeHandlersAsyncIfNeeded(snpReq, res.values())
+ completeHandlersAsyncIfNeeded(snpOp, res.values())
.listen(f -> {
if (f.error() != null)
- snpReq.error(f.error());
+ snpOp.error(f.error());
- endSnpProc.start(snpReq.requestId(), snpReq);
+ endSnpProc.start(
+ snpReq.requestId(),
+ new
SnapshotOperationEndRequest(snpReq.requestId(), snpOp.error(), snpOp.warnings())
+ );
}
);
}
@@ -1187,15 +1196,19 @@ public class IgniteSnapshotManager extends
GridCacheSharedManagerAdapter
/**
* Execute the {@link SnapshotHandler#complete(String, Collection)} method
of the snapshot handlers asynchronously.
*
- * @param req Request on snapshot creation.
+ * @param snpOp Snapshot creation operation.
* @param res Results.
* @return Future that will be completed when the handlers are finished
executing.
*/
- private IgniteInternalFuture<Void>
completeHandlersAsyncIfNeeded(SnapshotOperationRequest req,
- Collection<SnapshotOperationResponse> res) {
- if (req.error() != null)
+ private IgniteInternalFuture<Void> completeHandlersAsyncIfNeeded(
+ SnapshotOperation snpOp,
+ Collection<SnapshotOperationResponse> res
+ ) {
+ if (snpOp.error() != null)
return new GridFinishedFuture<>();
+ SnapshotOperationRequest req = snpOp.request();
+
Map<String, List<SnapshotHandlerResult<?>>> clusterHndResults = new
HashMap<>();
for (SnapshotOperationResponse snpRes : res) {
@@ -1215,7 +1228,7 @@ public class IgniteSnapshotManager extends
GridCacheSharedManagerAdapter
handlers().execSvc.submit(() -> {
try {
handlers.completeAll(SnapshotHandlerType.CREATE,
req.snapshotName(), clusterHndResults, req.nodes(),
- req::warnings);
+ snpOp::warnings);
resultFut.onDone();
}
@@ -1235,60 +1248,61 @@ public class IgniteSnapshotManager extends
GridCacheSharedManagerAdapter
}
/**
- * @param req Request on snapshot creation.
+ * @param endReq Snapshot creation end request.
* @return Future which will be completed when the snapshot will be
finalized.
*/
- private IgniteInternalFuture<SnapshotOperationResponse>
initLocalSnapshotEndStage(SnapshotOperationRequest req) {
- SnapshotOperationRequest snpReq = clusterSnpReq;
+ private IgniteInternalFuture<SnapshotOperationResponse>
initLocalSnapshotEndStage(SnapshotOperationEndRequest endReq) {
+ SnapshotOperation snpOp = curSnpOp;
- if (snpReq == null || !Objects.equals(req.requestId(),
snpReq.requestId()))
+ if (snpOp == null || !Objects.equals(endReq.requestId(),
snpOp.request().requestId()))
return new GridFinishedFuture<>();
- IgniteInternalFuture<?> prepFut = req.incremental() ? wrapMsgsFut :
new GridFinishedFuture<>();
+ SnapshotOperationRequest snpStartReq = curSnpOp.request();
+
+ IgniteInternalFuture<?> prepFut = snpStartReq.incremental() ?
wrapMsgsFut : new GridFinishedFuture<>();
if (cctx.kernalContext().clientNode())
return (IgniteInternalFuture<SnapshotOperationResponse>)prepFut;
return prepFut.chain(() -> {
try {
- if (req.error() != null) {
- snpReq.error(req.error());
+ if (endReq.error() != null) {
+ snpOp.error(endReq.error());
- if (req.incremental())
-
U.delete(snpReq.snapshotFileTree().incrementalSnapshotFileTree(req.incrementIndex()).root());
+ if (snpStartReq.incremental())
+
U.delete(snpOp.snapshotFileTree().incrementalSnapshotFileTree(snpStartReq.incrementIndex()).root());
else
- deleteSnapshot(snpReq.snapshotFileTree());
+ deleteSnapshot(snpOp.snapshotFileTree());
}
- else if (!F.isEmpty(req.warnings())) {
+ else if (!F.isEmpty(endReq.warnings())) {
// Pass the warnings further to the next stage for the
case when snapshot started from not coordinator.
if (!isLocalNodeCoordinator(cctx.discovery()))
- snpReq.warnings(req.warnings());
+ snpOp.warnings(endReq.warnings());
-
snpReq.meta().warnings(Collections.unmodifiableList(req.warnings()));
+
snpOp.meta().warnings(Collections.unmodifiableList(endReq.warnings()));
- storeWarnings(snpReq);
+ storeWarnings(snpOp);
}
- if (req.dump()) {
- if (!U.delete(snpReq.snapshotFileTree().dumpLock()))
- throw new IgniteCheckedException("Lock file can't be
deleted: " + snpReq.snapshotFileTree().dumpLock());
+ if (snpStartReq.dump()) {
+ if (!U.delete(snpOp.snapshotFileTree().dumpLock()))
+ throw new IgniteCheckedException("Lock file can't be
deleted: " + snpOp.snapshotFileTree().dumpLock());
}
else {
removeLastMetaStorageKey();
- if (req.error() == null) {
- Collection<Integer> grpIds =
req.groups().stream().map(CU::cacheId).collect(Collectors.toList());
+ if (endReq.error() == null) {
+ Collection<Integer> grpIds =
snpStartReq.groups().stream().map(CU::cacheId).collect(Collectors.toList());
enableIncrementalSnapshotsCreation(grpIds);
}
}
if (log.isInfoEnabled())
- log.info("Finishing local snapshot operation [req=" + req
+ ']');
+ log.info("Finishing local snapshot operation [op=" + snpOp
+ ']');
}
catch (Exception e) {
- log.error("Finishing local snapshot operation failed " +
- "[req=" + req + ", err=" + e + ']');
+ log.error("Finishing local snapshot operation failed [op=" +
snpOp + ", err=" + e + ']');
throw F.wrap(e);
}
@@ -1303,8 +1317,10 @@ public class IgniteSnapshotManager extends
GridCacheSharedManagerAdapter
* process possible snapshot errors, deleting snapshot at second stage
end. Doesn't worth. If an error occurs on
* warnings writing, it is logged only.
*/
- private void storeWarnings(SnapshotOperationRequest snpReq) {
- assert !F.isEmpty(snpReq.warnings());
+ private void storeWarnings(SnapshotOperation snpOp) {
+ SnapshotOperationRequest snpReq = snpOp.request();
+
+ assert !F.isEmpty(snpOp.warnings());
List<ClusterNode> snpNodes =
cctx.kernalContext().cluster().get().nodes().stream()
.filter(n ->
snpReq.nodes().contains(n.id())).collect(Collectors.toList());
@@ -1315,11 +1331,11 @@ public class IgniteSnapshotManager extends
GridCacheSharedManagerAdapter
if (!oldestBaseline)
return;
- File tempSmf = snpReq.snapshotFileTree().tmpMeta();
- File smf = snpReq.snapshotFileTree().meta();
+ File tempSmf = snpOp.snapshotFileTree().tmpMeta();
+ File smf = snpOp.snapshotFileTree().meta();
try {
- storeSnapshotMeta(snpReq.meta(), tempSmf);
+ storeSnapshotMeta(snpOp.meta(), tempSmf);
Files.move(tempSmf.toPath(), smf.toPath(),
StandardCopyOption.ATOMIC_MOVE,
StandardCopyOption.REPLACE_EXISTING);
@@ -1330,7 +1346,7 @@ public class IgniteSnapshotManager extends
GridCacheSharedManagerAdapter
catch (Exception e) {
log.error("Failed to store warnings of snapshot '" +
snpReq.snapshotName() +
"' to the snapshot metafile. Snapshot won't contain them. The
warnings: [" +
- String.join(",", snpReq.warnings()) + "].", e);
+ String.join(",", snpOp.warnings()) + "].", e);
}
finally {
U.delete(tempSmf);
@@ -1343,11 +1359,13 @@ public class IgniteSnapshotManager extends
GridCacheSharedManagerAdapter
* @param err Errors.
*/
private void processLocalSnapshotEndStageResult(UUID id, Map<UUID,
SnapshotOperationResponse> res, Map<UUID, Throwable> err) {
- SnapshotOperationRequest snpReq = clusterSnpReq;
+ SnapshotOperation snpOp = curSnpOp;
- if (snpReq == null || !Objects.equals(id, snpReq.requestId()))
+ if (snpOp == null || !Objects.equals(id, snpOp.request().requestId()))
return;
+ SnapshotOperationRequest snpReq = snpOp.request();
+
Set<UUID> endFail = new HashSet<>(snpReq.nodes());
endFail.removeAll(res.keySet());
@@ -1357,17 +1375,17 @@ public class IgniteSnapshotManager extends
GridCacheSharedManagerAdapter
incSnpId = null;
- if (clusterSnpFut != null && endFail.isEmpty() && snpReq.error()
== null)
+ if (clusterSnpFut != null && endFail.isEmpty() && snpOp.error() ==
null)
warnAtomicCachesInIncrementalSnapshot(snpReq.snapshotName(),
snpReq.incrementIndex(), snpReq.groups());
}
- clusterSnpReq = null;
+ curSnpOp = null;
synchronized (snpOpMux) {
if (clusterSnpFut != null) {
- if (endFail.isEmpty() && snpReq.error() == null) {
- if (!F.isEmpty(snpReq.warnings())) {
- String wrnsLst = U.nl() + "\t- " + String.join(U.nl()
+ "\t- ", snpReq.warnings());
+ if (endFail.isEmpty() && snpOp.error() == null) {
+ if (!F.isEmpty(snpOp.warnings())) {
+ String wrnsLst = U.nl() + "\t- " + String.join(U.nl()
+ "\t- ", snpOp.warnings());
SnapshotWarningException wrn = new
SnapshotWarningException(
"Snapshot create operation completed with warnings
[name=" + snpReq.snapshotName() +
@@ -1384,15 +1402,15 @@ public class IgniteSnapshotManager extends
GridCacheSharedManagerAdapter
log.info(SNAPSHOT_FINISHED_MSG + snpReq);
}
}
- else if (snpReq.error() == null) {
- log.warning("Snapshot error: ", snpReq.error());
+ else if (snpOp.error() == null) {
+ log.warning("Snapshot error: ", snpOp.error());
clusterSnpFut.onDone(new IgniteCheckedException("Snapshot
creation has been finished with an error. " +
"Local snapshot tasks may not finished completely or
finalizing results fails " +
"[fail=" + endFail + ", err=" + err + ']'));
}
else
- clusterSnpFut.onDone(snpReq.error());
+ clusterSnpFut.onDone(snpOp.error());
clusterSnpFut = null;
}
@@ -1403,11 +1421,11 @@ public class IgniteSnapshotManager extends
GridCacheSharedManagerAdapter
* @return {@code True} if snapshot operation is in progress.
*/
public boolean isSnapshotCreating() {
- if (clusterSnpReq != null)
+ if (curSnpOp != null)
return true;
synchronized (snpOpMux) {
- return clusterSnpReq != null || clusterSnpFut != null;
+ return curSnpOp != null || clusterSnpFut != null;
}
}
@@ -1422,15 +1440,17 @@ public class IgniteSnapshotManager extends
GridCacheSharedManagerAdapter
* Sets the streamer warning flag to current snapshot process if it is
active.
*/
public void streamerWarning() {
- SnapshotOperationRequest snpTask = currentCreateRequest();
+ SnapshotOperation snpOp = curSnpOp;
- if (snpTask != null && !snpTask.streamerWarning())
- snpTask.streamerWarning(true);
+ if (snpOp != null && !snpOp.streamerWarning())
+ snpOp.streamerWarning(true);
}
/** @return Current create snapshot request. {@code Null} if there is no
create snapshot operation in progress. */
@Nullable public SnapshotOperationRequest currentCreateRequest() {
- return clusterSnpReq;
+ SnapshotOperation snpOp = curSnpOp;
+
+ return snpOp == null ? null : snpOp.request();
}
/**
@@ -1995,7 +2015,7 @@ public class IgniteSnapshotManager extends
GridCacheSharedManagerAdapter
);
}
- if (clusterSnpReq != null)
+ if (curSnpOp != null)
throw new IgniteException("Create snapshot request has
been rejected. Parallel snapshot processes are not allowed.");
boolean snpExists = localSnapshotNames(snpPath).contains(name);
@@ -2258,10 +2278,12 @@ public class IgniteSnapshotManager extends
GridCacheSharedManagerAdapter
/** {@inheritDoc} */
@Override public void
onDoneBeforeTopologyUnlock(GridDhtPartitionsExchangeFuture fut, @Nullable
Throwable err) {
- if (clusterSnpReq == null || cctx.kernalContext().clientNode() ||
!isSnapshotOperation(fut.firstEvent()))
+ SnapshotOperation snpOp = curSnpOp;
+
+ if (snpOp == null || cctx.kernalContext().clientNode() ||
!isSnapshotOperation(fut.firstEvent()))
return;
- SnapshotOperationRequest snpReq = clusterSnpReq;
+ SnapshotOperationRequest snpReq = snpOp.request();
if (snpReq.incremental())
return;
@@ -2476,12 +2498,12 @@ public class IgniteSnapshotManager extends
GridCacheSharedManagerAdapter
/** @return Current snapshot task. */
public <T extends AbstractSnapshotFutureTask<?>> T
currentSnapshotTask(Class<T> snpTaskCls) {
- SnapshotOperationRequest req = clusterSnpReq;
+ SnapshotOperation snpOp = curSnpOp;
- if (req == null)
+ if (snpOp == null)
return null;
- AbstractSnapshotFutureTask<?> task =
locSnpTasks.get(req.snapshotName());
+ AbstractSnapshotFutureTask<?> task =
locSnpTasks.get(snpOp.request().snapshotName());
if (task == null || task.getClass() != snpTaskCls)
return null;
diff --git
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/snapshot/SnapshotCheckProcessRequest.java
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/snapshot/SnapshotCheckProcessRequest.java
index 3fa731fc0e4..ec646f25420 100644
---
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/snapshot/SnapshotCheckProcessRequest.java
+++
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/snapshot/SnapshotCheckProcessRequest.java
@@ -53,7 +53,7 @@ public class SnapshotCheckProcessRequest extends
AbstractSnapshotOperationReques
*
* @param reqId Request ID.
* @param snpName Snapshot name.
- * @param nodes Baseline node IDs that must be alive to complete the
operation..
+ * @param nodes Baseline node IDs that must be alive to complete the
operation.
* @param snpPath Snapshot directory path.
* @param grps List of cache group names.
* @param fullCheck If {@code true}, additionally calculates partition
hashes. Otherwise, checks only snapshot integrity
@@ -73,7 +73,7 @@ public class SnapshotCheckProcessRequest extends
AbstractSnapshotOperationReques
int incIdx,
boolean allRestoreHandlers
) {
- super(reqId, snpName, snpPath, grps, 0, nodes);
+ super(reqId, snpName, snpPath, grps, nodes);
assert !F.isEmpty(nodes);
diff --git
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/snapshot/SnapshotOperation.java
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/snapshot/SnapshotOperation.java
index 51c8c32d0ac..55eee5e52b1 100644
---
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/snapshot/SnapshotOperation.java
+++
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/snapshot/SnapshotOperation.java
@@ -18,28 +18,117 @@
package org.apache.ignite.internal.processors.cache.persistence.snapshot;
-import java.io.Serializable;
-import java.util.Set;
+import java.util.List;
+import
org.apache.ignite.internal.processors.cache.persistence.filename.SnapshotFileTree;
+import
org.apache.ignite.internal.util.distributed.DistributedProcess.DistributedProcessType;
+import org.apache.ignite.internal.util.tostring.GridToStringExclude;
+import org.apache.ignite.internal.util.typedef.internal.S;
/**
- * Initial snapshot operation interface.
+ * Current snapshot operation on local node.
*/
-public interface SnapshotOperation extends Serializable {
+public class SnapshotOperation {
+ /** */
+ private final SnapshotOperationRequest req;
+
+ /** Snapshot file tree. */
+ @GridToStringExclude
+ private final SnapshotFileTree sft;
+
+ /** Snapshot metadata. */
+ @GridToStringExclude
+ private SnapshotMetadata meta;
+
+ /** Exception occurred during snapshot operation processing. */
+ private volatile Throwable err;
+
+ /** Warning flag of concurrent inconsistent-by-nature streamer updates. */
+ @GridToStringExclude
+ private volatile boolean streamerWrn;
+
/**
- * Cache group ids included to this snapshot.
- *
- * @return Cache group identifiers.
+ * Snapshot operation warnings. Warnings do not interrupt snapshot process
but raise exception at the end to make
+ * the operation status 'not OK' if no other error occurred.
*/
- public Set<Integer> cacheGroupIds();
+ private volatile List<String> warnings;
+
+ /** Flag indicating that the {@link DistributedProcessType#START_SNAPSHOT}
phase has completed. */
+ private volatile boolean startStageEnded;
+
+ /** */
+ public SnapshotOperation(SnapshotOperationRequest req, SnapshotFileTree
sft) {
+ this.req = req;
+ this.sft = sft;
+ }
+
+ /** */
+ public SnapshotOperationRequest request() {
+ return req;
+ }
+
+ /** @return Snapshot file tree. */
+ public SnapshotFileTree snapshotFileTree() {
+ return sft;
+ }
+
+ /** @return Snapshot metadata. */
+ public SnapshotMetadata meta() {
+ return meta;
+ }
+
+ /** Stores snapshot metadata. */
+ public void meta(SnapshotMetadata meta) {
+ this.meta = meta;
+ }
+
+ /** @return Exception occurred during snapshot operation processing. */
+ public Throwable error() {
+ return err;
+ }
+
+ /** @param err Exception occurred during snapshot operation processing. */
+ public void error(Throwable err) {
+ this.err = err;
+ }
+
+ /** {@code True} If the streamer warning flag is set. {@code False}
otherwise. */
+ public boolean streamerWarning() {
+ return streamerWrn;
+ }
+
+ /** Sets the streamer warning flag. */
+ public boolean streamerWarning(boolean val) {
+ return streamerWrn = val;
+ }
+
+ /** @return Warnings of snapshot operation. */
+ public List<String> warnings() {
+ return warnings;
+ }
+
+ /** @param warnings Warnings of snapshot operation. */
+ public void warnings(List<String> warnings) {
+ assert this.warnings == null;
+
+ this.warnings = warnings;
+ }
/**
- * @return Cache names included to this snapshot.
+ * @return Flag indicating that the {@link
DistributedProcessType#START_SNAPSHOT} phase has completed.
*/
- public Set<String> cacheNames();
+ protected boolean startStageEnded() {
+ return startStageEnded;
+ }
/**
- * @return Any custom extra parameter.
- * In case Map object is provided, contains named snapshot operation
attributes.
+ * @param startStageEnded Flag indicating that the {@link
DistributedProcessType#START_SNAPSHOT} phase has completed.
*/
- public Object extraParameter();
+ protected void startStageEnded(boolean startStageEnded) {
+ this.startStageEnded = startStageEnded;
+ }
+
+ /** {@inheritDoc} */
+ @Override public String toString() {
+ return S.toString(SnapshotOperation.class, this, super.toString());
+ }
}
diff --git
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/snapshot/SnapshotOperationEndRequest.java
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/snapshot/SnapshotOperationEndRequest.java
new file mode 100644
index 00000000000..ce13f905857
--- /dev/null
+++
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/snapshot/SnapshotOperationEndRequest.java
@@ -0,0 +1,78 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.processors.cache.persistence.snapshot;
+
+import java.io.Serializable;
+import java.util.List;
+import java.util.UUID;
+import
org.apache.ignite.internal.util.distributed.DistributedProcess.DistributedProcessType;
+import org.apache.ignite.internal.util.tostring.GridToStringInclude;
+import org.apache.ignite.internal.util.typedef.internal.S;
+import org.jetbrains.annotations.Nullable;
+
+/**
+ * Snapshot operation end request for {@link
DistributedProcessType#END_SNAPSHOT} initiate message.
+ */
+public class SnapshotOperationEndRequest implements Serializable {
+ /** Serial version uid. */
+ private static final long serialVersionUID = 0L;
+
+ /** Request ID. */
+ @GridToStringInclude
+ private final UUID reqId;
+
+ /** Exception occurred during snapshot operation processing. */
+ @Nullable private final Throwable err;
+
+ /**
+ * Snapshot operation warnings. Warnings do not interrupt snapshot process
but raise exception at the end to make
+ * the operation status 'not OK' if no other error occurred.
+ */
+ @Nullable private final List<String> warnings;
+
+ /**
+ * @param id Request ID.
+ * @param err Exception occurred during snapshot operation processing.
+ * @param warnings Warnings of snapshot operation.
+ */
+ public SnapshotOperationEndRequest(UUID id, @Nullable Throwable err,
@Nullable List<String> warnings) {
+ reqId = id;
+ this.err = err;
+ this.warnings = warnings;
+ }
+
+ /** @return Request ID. */
+ public UUID requestId() {
+ return reqId;
+ }
+
+ /** @return Exception occurred during snapshot operation processing. */
+ @Nullable public Throwable error() {
+ return err;
+ }
+
+ /** @return Warnings of snapshot operation. */
+ @Nullable public List<String> warnings() {
+ return warnings;
+ }
+
+ /** {@inheritDoc} */
+ @Override public String toString() {
+ return S.toString(SnapshotOperationEndRequest.class, this,
super.toString());
+ }
+}
diff --git
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/snapshot/SnapshotOperationRequest.java
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/snapshot/SnapshotOperationRequest.java
index 8d3914828a6..e4a04bec8a5 100644
---
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/snapshot/SnapshotOperationRequest.java
+++
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/snapshot/SnapshotOperationRequest.java
@@ -18,18 +18,14 @@
package org.apache.ignite.internal.processors.cache.persistence.snapshot;
import java.util.Collection;
-import java.util.List;
import java.util.Set;
import java.util.UUID;
-import
org.apache.ignite.internal.processors.cache.persistence.filename.SnapshotFileTree;
import org.apache.ignite.internal.util.distributed.DistributedProcess;
-import
org.apache.ignite.internal.util.distributed.DistributedProcess.DistributedProcessType;
-import org.apache.ignite.internal.util.tostring.GridToStringExclude;
import org.apache.ignite.internal.util.typedef.internal.S;
import org.jetbrains.annotations.Nullable;
/**
- * Snapshot operation start request for {@link DistributedProcess} initiate
message.
+ * Snapshot operation end request for {@link
DistributedProcess.DistributedProcessType#START_SNAPSHOT} initiate message.
*/
public class SnapshotOperationRequest extends AbstractSnapshotOperationRequest
{
/** Serial version uid. */
@@ -38,32 +34,6 @@ public class SnapshotOperationRequest extends
AbstractSnapshotOperationRequest {
/** Operational node ID. */
private final UUID opNodeId;
- /** Exception occurred during snapshot operation processing. */
- private volatile Throwable err;
-
- /**
- * Snapshot operation warnings. Warnings do not interrupt snapshot process
but raise exception at the end to make
- * the operation status 'not OK' if no other error occurred.
- */
- private volatile List<String> warnings;
-
- /** Snapshot metadata. */
- @GridToStringExclude
- private transient SnapshotMetadata meta;
-
- /** Snapshot file tree. */
- @GridToStringExclude
- private transient SnapshotFileTree sft;
-
- /**
- * Warning flag of concurrent inconsistent-by-nature streamer updates.
- */
- @GridToStringExclude
- private transient volatile boolean streamerWrn;
-
- /** Flag indicating that the {@link DistributedProcessType#START_SNAPSHOT}
phase has completed. */
- private transient volatile boolean startStageEnded;
-
/** If {@code true} then incremental snapshot requested. */
private final boolean incremental;
@@ -115,7 +85,7 @@ public class SnapshotOperationRequest extends
AbstractSnapshotOperationRequest {
boolean encrypt,
boolean configOnly
) {
- super(reqId, snpName, snpPath, grps, incIdx, nodes);
+ super(reqId, snpName, snpPath, grps, nodes);
this.opNodeId = opNodeId;
this.incremental = incremental;
@@ -134,20 +104,6 @@ public class SnapshotOperationRequest extends
AbstractSnapshotOperationRequest {
return opNodeId;
}
- /**
- * @return Exception occurred during snapshot operation processing.
- */
- public Throwable error() {
- return err;
- }
-
- /**
- * @param err Exception occurred during snapshot operation processing.
- */
- public void error(Throwable err) {
- this.err = err;
- }
-
/** @return {@code True} if incremental snapshot requested. */
public boolean incremental() {
return incremental;
@@ -183,78 +139,6 @@ public class SnapshotOperationRequest extends
AbstractSnapshotOperationRequest {
return configOnly;
}
- /**
- * @return Flag indicating that the {@link
DistributedProcessType#START_SNAPSHOT} phase has completed.
- */
- protected boolean startStageEnded() {
- return startStageEnded;
- }
-
- /**
- * @param startStageEnded Flag indicating that the {@link
DistributedProcessType#START_SNAPSHOT} phase has completed.
- */
- protected void startStageEnded(boolean startStageEnded) {
- this.startStageEnded = startStageEnded;
- }
-
- /**
- * @return Warnings of snapshot operation.
- */
- public List<String> warnings() {
- return warnings;
- }
-
- /**
- * @param warnings Warnings of snapshot operation.
- */
- public void warnings(List<String> warnings) {
- assert this.warnings == null;
-
- this.warnings = warnings;
- }
-
- /**
- * {@code True} If the streamer warning flag is set. {@code False}
otherwise.
- */
- public boolean streamerWarning() {
- return streamerWrn;
- }
-
- /**
- * Sets the streamer warning flag.
- */
- public boolean streamerWarning(boolean val) {
- return streamerWrn = val;
- }
-
- /**
- * @return Snapshot metadata.
- */
- public SnapshotMetadata meta() {
- return meta;
- }
-
- /**
- * Stores snapshot metadata.
- */
- public void meta(SnapshotMetadata meta) {
- this.meta = meta;
- }
-
- /**
- * Stores snapshot file tree.
- */
- public void snapshotFileTree(SnapshotFileTree sft) {
- this.sft = sft;
- }
-
- /**
- * @return Snapshot file tree.
- */
- public SnapshotFileTree snapshotFileTree() {
- return sft;
- }
-
/** {@inheritDoc} */
@Override public String toString() {
return S.toString(SnapshotOperationRequest.class, this);
diff --git a/modules/core/src/main/resources/META-INF/classnames.properties
b/modules/core/src/main/resources/META-INF/classnames.properties
index e4f3b99b972..a6b3f0122c0 100644
--- a/modules/core/src/main/resources/META-INF/classnames.properties
+++ b/modules/core/src/main/resources/META-INF/classnames.properties
@@ -1312,7 +1312,6 @@
org.apache.ignite.internal.processors.cache.persistence.snapshot.SnapshotHandler
org.apache.ignite.internal.processors.cache.persistence.snapshot.SnapshotHandlerResult
org.apache.ignite.internal.processors.cache.persistence.snapshot.SnapshotHandlerType
org.apache.ignite.internal.processors.cache.persistence.snapshot.SnapshotMetadata
-org.apache.ignite.internal.processors.cache.persistence.snapshot.SnapshotOperation
org.apache.ignite.internal.processors.cache.persistence.snapshot.SnapshotOperationRequest
org.apache.ignite.internal.processors.cache.persistence.snapshot.SnapshotRestoreProcess$SnapshotRestoreOperationResponse
org.apache.ignite.internal.processors.cache.persistence.snapshot.SnapshotRestoreStatusTask