GG-11860 Implement snapshot status on platform level -introducing SnapshotOperationFuture -adding to API gettingOngoingOperation
Project: http://git-wip-us.apache.org/repos/asf/ignite/repo Commit: http://git-wip-us.apache.org/repos/asf/ignite/commit/d37d4df9 Tree: http://git-wip-us.apache.org/repos/asf/ignite/tree/d37d4df9 Diff: http://git-wip-us.apache.org/repos/asf/ignite/diff/d37d4df9 Branch: refs/heads/ignite-gg-8.0.3.ea6-clients-test Commit: d37d4df928ae2c9a1ba8cb5a236e4c3a7ab1b361 Parents: b51a2f8 Author: Eduard Shangareev <[email protected]> Authored: Mon Mar 13 11:33:55 2017 +0300 Committer: Eduard Shangareev <[email protected]> Committed: Mon Mar 13 11:33:55 2017 +0300 ---------------------------------------------------------------------- ...ishSnapshotOperationAckDiscoveryMessage.java | 74 ++++++++++++++++++++ .../pagemem/snapshot/SnapshotOperation.java | 10 +++ .../StartSnapshotOperationDiscoveryMessage.java | 35 +++++++-- .../IgniteCacheDatabaseSharedManager.java | 10 +-- .../GridDhtPartitionsExchangeFuture.java | 7 +- 5 files changed, 123 insertions(+), 13 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/ignite/blob/d37d4df9/modules/core/src/main/java/org/apache/ignite/internal/pagemem/snapshot/FinishSnapshotOperationAckDiscoveryMessage.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/pagemem/snapshot/FinishSnapshotOperationAckDiscoveryMessage.java b/modules/core/src/main/java/org/apache/ignite/internal/pagemem/snapshot/FinishSnapshotOperationAckDiscoveryMessage.java new file mode 100644 index 0000000..e19ddc4 --- /dev/null +++ b/modules/core/src/main/java/org/apache/ignite/internal/pagemem/snapshot/FinishSnapshotOperationAckDiscoveryMessage.java @@ -0,0 +1,74 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ignite.internal.pagemem.snapshot; + +import org.apache.ignite.internal.managers.discovery.DiscoveryCustomMessage; +import org.apache.ignite.lang.IgniteUuid; +import org.jetbrains.annotations.Nullable; + +/** + * + */ +public class FinishSnapshotOperationAckDiscoveryMessage implements DiscoveryCustomMessage { + /** Id. */ + private final IgniteUuid id = IgniteUuid.randomUuid(); + + /** Op id. */ + private final IgniteUuid opId; + + /** Success. */ + private final boolean success; + + /** + * @param opId Op id. + * @param success Success. + */ + public FinishSnapshotOperationAckDiscoveryMessage(IgniteUuid opId, boolean success) { + this.opId = opId; + this.success = success; + } + + /** {@inheritDoc} */ + @Override public IgniteUuid id() { + return id; + } + + /** {@inheritDoc} */ + @Nullable @Override public DiscoveryCustomMessage ackMessage() { + return null; + } + + /** {@inheritDoc} */ + @Override public boolean isMutable() { + return false; + } + + /** + * @return Op id. + */ + public IgniteUuid operationId() { + return opId; + } + + /** + * @return Success. + */ + public boolean success() { + return success; + } +} http://git-wip-us.apache.org/repos/asf/ignite/blob/d37d4df9/modules/core/src/main/java/org/apache/ignite/internal/pagemem/snapshot/SnapshotOperation.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/pagemem/snapshot/SnapshotOperation.java b/modules/core/src/main/java/org/apache/ignite/internal/pagemem/snapshot/SnapshotOperation.java index f3b5eee..36577a6 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/pagemem/snapshot/SnapshotOperation.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/pagemem/snapshot/SnapshotOperation.java @@ -154,4 +154,14 @@ public class SnapshotOperation implements Serializable { result = 31 * result + (extraParam != null ? extraParam.hashCode() : 0); return result; } + + @Override public String toString() { + return "SnapshotOperation{" + + "type=" + type + + ", snapshotId=" + snapshotId + + ", cacheNames=" + cacheNames + + ", msg='" + msg + '\'' + + ", extraParam=" + extraParam + + '}'; + } } http://git-wip-us.apache.org/repos/asf/ignite/blob/d37d4df9/modules/core/src/main/java/org/apache/ignite/internal/pagemem/snapshot/StartSnapshotOperationDiscoveryMessage.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/pagemem/snapshot/StartSnapshotOperationDiscoveryMessage.java b/modules/core/src/main/java/org/apache/ignite/internal/pagemem/snapshot/StartSnapshotOperationDiscoveryMessage.java index 2373a9b..700f404 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/pagemem/snapshot/StartSnapshotOperationDiscoveryMessage.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/pagemem/snapshot/StartSnapshotOperationDiscoveryMessage.java @@ -21,6 +21,8 @@ package org.apache.ignite.internal.pagemem.snapshot; import java.util.HashMap; import java.util.Map; import java.util.UUID; + +import org.apache.ignite.IgniteException; import org.apache.ignite.internal.managers.discovery.DiscoveryCustomMessage; import org.apache.ignite.internal.util.typedef.internal.S; import org.apache.ignite.lang.IgniteUuid; @@ -33,8 +35,11 @@ public class StartSnapshotOperationDiscoveryMessage implements DiscoveryCustomMe /** */ private static final long serialVersionUID = 0L; + /** Id. */ + private IgniteUuid id = IgniteUuid.randomUuid(); + /** Custom message ID. */ - private IgniteUuid id; + private IgniteUuid operationId; /** Snapshot operation. */ private SnapshotOperation snapshotOperation; @@ -42,6 +47,9 @@ public class StartSnapshotOperationDiscoveryMessage implements DiscoveryCustomMe /** */ private UUID initiatorId; + /** Validated by coordinator. */ + private boolean validatedByCoordinator = false; + /** Error. */ private Exception err; @@ -56,11 +64,11 @@ public class StartSnapshotOperationDiscoveryMessage implements DiscoveryCustomMe * @param initiatorId initiator node id */ public StartSnapshotOperationDiscoveryMessage( - IgniteUuid id, + IgniteUuid operationId, SnapshotOperation snapshotOperation, UUID initiatorId ) { - this.id = id; + this.operationId = operationId; this.snapshotOperation = snapshotOperation; this.initiatorId = initiatorId; } @@ -98,7 +106,7 @@ public class StartSnapshotOperationDiscoveryMessage implements DiscoveryCustomMe /** * @return Initiator node id. */ - public UUID initiatorId() { + public UUID initiatorNodeId() { return initiatorId; } @@ -107,6 +115,11 @@ public class StartSnapshotOperationDiscoveryMessage implements DiscoveryCustomMe return id; } + /** {@inheritDoc} */ + public IgniteUuid operationId() { + return operationId; + } + /** * @param cacheId Cache id. */ @@ -129,6 +142,16 @@ public class StartSnapshotOperationDiscoveryMessage implements DiscoveryCustomMe return lastSnapshotIdForCache.get(cacheId); } + /** @return Validated by coordinator. */ + public boolean validatedByCoordinator() { + return validatedByCoordinator; + } + + /** Validated by coordinator. */ + public void validatedByCoordinator(boolean validatedByCoordinator) { + this.validatedByCoordinator = validatedByCoordinator; + } + /** * @param cacheId Cache id. * @param id Id. @@ -140,11 +163,11 @@ public class StartSnapshotOperationDiscoveryMessage implements DiscoveryCustomMe /** {@inheritDoc} */ @Nullable @Override public DiscoveryCustomMessage ackMessage() { return new StartSnapshotOperationAckDiscoveryMessage( - id, + operationId, snapshotOperation, lastFullSnapshotIdForCache, lastSnapshotIdForCache, - err, + err != null ? err : (validatedByCoordinator? null : new IgniteException("Coordinator didn't validate operation!")), initiatorId); } http://git-wip-us.apache.org/repos/asf/ignite/blob/d37d4df9/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/database/IgniteCacheDatabaseSharedManager.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/database/IgniteCacheDatabaseSharedManager.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/database/IgniteCacheDatabaseSharedManager.java index 95ed3ae..3380e16 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/database/IgniteCacheDatabaseSharedManager.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/database/IgniteCacheDatabaseSharedManager.java @@ -19,9 +19,9 @@ package org.apache.ignite.internal.processors.cache.database; import java.io.File; import java.util.Collection; +import java.util.UUID; import org.apache.ignite.IgniteCheckedException; import org.apache.ignite.IgniteLogger; -import org.apache.ignite.cluster.ClusterNode; import org.apache.ignite.configuration.MemoryConfiguration; import org.apache.ignite.internal.GridKernalContext; import org.apache.ignite.internal.IgniteInternalFuture; @@ -29,7 +29,7 @@ import org.apache.ignite.internal.mem.DirectMemoryProvider; import org.apache.ignite.internal.mem.file.MappedFileMemoryProvider; import org.apache.ignite.internal.mem.unsafe.UnsafeMemoryProvider; import org.apache.ignite.internal.pagemem.PageMemory; -import org.apache.ignite.internal.pagemem.snapshot.StartSnapshotOperationAckDiscoveryMessage; +import org.apache.ignite.internal.pagemem.snapshot.SnapshotOperation; import org.apache.ignite.internal.pagemem.impl.PageMemoryNoStoreImpl; import org.apache.ignite.internal.processors.cache.GridCacheContext; import org.apache.ignite.internal.processors.cache.GridCacheSharedManagerAdapter; @@ -208,11 +208,13 @@ public class IgniteCacheDatabaseSharedManager extends GridCacheSharedManagerAdap } /** - * @param snapshotMsg Snapshot message. + * @param initiatorNodeId Snapshot message. + * @param snapshotOperation * @return Snapshot creation init future or {@code null} if snapshot is not available. * @throws IgniteCheckedException If failed. */ - @Nullable public IgniteInternalFuture startLocalSnapshotOperation(StartSnapshotOperationAckDiscoveryMessage snapshotMsg) + @Nullable public IgniteInternalFuture startLocalSnapshotOperation(UUID initiatorNodeId, + SnapshotOperation snapshotOperation) throws IgniteCheckedException { return null; } http://git-wip-us.apache.org/repos/asf/ignite/blob/d37d4df9/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionsExchangeFuture.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionsExchangeFuture.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionsExchangeFuture.java index 4c179e6..925848b 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionsExchangeFuture.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionsExchangeFuture.java @@ -889,11 +889,12 @@ public class GridDhtPartitionsExchangeFuture extends GridFutureAdapter<AffinityT } /** - * @param snapshotOperationMsg Snapshot operation message. + * @param snapOpMsg Snapshot operation message. */ - private void startLocalSnasphotOperation(StartSnapshotOperationAckDiscoveryMessage snapshotOperationMsg + private void startLocalSnasphotOperation(StartSnapshotOperationAckDiscoveryMessage snapOpMsg ) throws IgniteCheckedException { - IgniteInternalFuture fut = cctx.database().startLocalSnapshotOperation(snapshotOperationMsg); + IgniteInternalFuture fut = cctx.database() + .startLocalSnapshotOperation(snapOpMsg.initiatorNodeId(), snapOpMsg.snapshotOperation()); if (fut != null) fut.get();
