alex-plekhanov commented on a change in pull request #7607: IGNITE-11073: Create consistent partitions copy on each cluster node URL: https://github.com/apache/ignite/pull/7607#discussion_r407968807
########## File path: modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/snapshot/IgniteSnapshotManager.java ########## @@ -0,0 +1,1894 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ignite.internal.processors.cache.persistence.snapshot; + +import java.io.File; +import java.io.FileOutputStream; +import java.io.IOException; +import java.io.Serializable; +import java.nio.ByteBuffer; +import java.nio.ByteOrder; +import java.nio.channels.FileChannel; +import java.nio.file.FileVisitResult; +import java.nio.file.Files; +import java.nio.file.Path; +import java.nio.file.Paths; +import java.nio.file.SimpleFileVisitor; +import java.nio.file.attribute.BasicFileAttributes; +import java.util.ArrayDeque; +import java.util.ArrayList; +import java.util.Arrays; +import java.util.Collection; +import java.util.HashMap; +import java.util.HashSet; +import java.util.List; +import java.util.Map; +import java.util.Objects; +import java.util.Queue; +import java.util.Set; +import java.util.UUID; +import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.ConcurrentMap; +import java.util.concurrent.Executor; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.LinkedBlockingQueue; +import java.util.concurrent.RejectedExecutionException; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicReference; +import java.util.concurrent.atomic.LongAdder; +import java.util.function.BiConsumer; +import java.util.function.BiFunction; +import java.util.function.Consumer; +import java.util.function.Function; +import java.util.function.Supplier; +import java.util.stream.Collectors; +import org.apache.ignite.IgniteCheckedException; +import org.apache.ignite.IgniteException; +import org.apache.ignite.IgniteLogger; +import org.apache.ignite.IgniteSnapshot; +import org.apache.ignite.binary.BinaryType; +import org.apache.ignite.cluster.ClusterNode; +import org.apache.ignite.configuration.IgniteConfiguration; +import org.apache.ignite.events.DiscoveryEvent; +import org.apache.ignite.failure.FailureContext; +import org.apache.ignite.failure.FailureType; +import org.apache.ignite.internal.GridKernalContext; +import org.apache.ignite.internal.GridTopic; +import org.apache.ignite.internal.IgniteFeatures; +import org.apache.ignite.internal.IgniteInternalFuture; +import org.apache.ignite.internal.NodeStoppingException; +import org.apache.ignite.internal.cluster.ClusterTopologyCheckedException; +import org.apache.ignite.internal.events.DiscoveryCustomEvent; +import org.apache.ignite.internal.managers.communication.GridIoManager; +import org.apache.ignite.internal.managers.communication.GridMessageListener; +import org.apache.ignite.internal.managers.communication.TransmissionCancelledException; +import org.apache.ignite.internal.managers.communication.TransmissionHandler; +import org.apache.ignite.internal.managers.communication.TransmissionMeta; +import org.apache.ignite.internal.managers.communication.TransmissionPolicy; +import org.apache.ignite.internal.managers.discovery.DiscoCache; +import org.apache.ignite.internal.managers.discovery.DiscoveryCustomMessage; +import org.apache.ignite.internal.managers.discovery.GridDiscoveryManager; +import org.apache.ignite.internal.managers.eventstorage.DiscoveryEventListener; +import org.apache.ignite.internal.processors.affinity.AffinityTopologyVersion; +import org.apache.ignite.internal.processors.cache.CacheGroupDescriptor; +import org.apache.ignite.internal.processors.cache.CacheType; +import org.apache.ignite.internal.processors.cache.GridCacheSharedManagerAdapter; +import org.apache.ignite.internal.processors.cache.distributed.dht.preloader.GridDhtPartitionsExchangeFuture; +import org.apache.ignite.internal.processors.cache.distributed.dht.preloader.PartitionsExchangeAware; +import org.apache.ignite.internal.processors.cache.persistence.StorageException; +import org.apache.ignite.internal.processors.cache.persistence.file.FileIO; +import org.apache.ignite.internal.processors.cache.persistence.file.FileIOFactory; +import org.apache.ignite.internal.processors.cache.persistence.file.FilePageStore; +import org.apache.ignite.internal.processors.cache.persistence.file.FilePageStoreFactory; +import org.apache.ignite.internal.processors.cache.persistence.file.FilePageStoreManager; +import org.apache.ignite.internal.processors.cache.persistence.file.RandomAccessFileIOFactory; +import org.apache.ignite.internal.processors.cache.persistence.filename.PdsFolderSettings; +import org.apache.ignite.internal.processors.cache.persistence.metastorage.MetastorageLifecycleListener; +import org.apache.ignite.internal.processors.cache.persistence.metastorage.ReadOnlyMetastorage; +import org.apache.ignite.internal.processors.cache.persistence.metastorage.ReadWriteMetastorage; +import org.apache.ignite.internal.processors.cache.persistence.partstate.GroupPartitionId; +import org.apache.ignite.internal.processors.cache.persistence.tree.io.PageIO; +import org.apache.ignite.internal.processors.cache.persistence.wal.crc.FastCrc; +import org.apache.ignite.internal.processors.cluster.DiscoveryDataClusterState; +import org.apache.ignite.internal.processors.marshaller.MappedName; +import org.apache.ignite.internal.processors.metric.impl.LongAdderMetric; +import org.apache.ignite.internal.util.GridBusyLock; +import org.apache.ignite.internal.util.distributed.DistributedProcess; +import org.apache.ignite.internal.util.distributed.InitMessage; +import org.apache.ignite.internal.util.future.GridFinishedFuture; +import org.apache.ignite.internal.util.future.GridFutureAdapter; +import org.apache.ignite.internal.util.future.IgniteFinishedFutureImpl; +import org.apache.ignite.internal.util.future.IgniteFutureImpl; +import org.apache.ignite.internal.util.tostring.GridToStringInclude; +import org.apache.ignite.internal.util.typedef.F; +import org.apache.ignite.internal.util.typedef.internal.CU; +import org.apache.ignite.internal.util.typedef.internal.S; +import org.apache.ignite.internal.util.typedef.internal.U; +import org.apache.ignite.lang.IgniteFuture; +import org.apache.ignite.lang.IgniteUuid; +import org.apache.ignite.thread.IgniteThreadPoolExecutor; +import org.apache.ignite.thread.OomExceptionHandler; +import org.jetbrains.annotations.Nullable; + +import static java.nio.file.StandardOpenOption.READ; +import static org.apache.ignite.cluster.ClusterState.active; +import static org.apache.ignite.configuration.IgniteConfiguration.DFLT_SNAPSHOT_DIRECTORY; +import static org.apache.ignite.events.EventType.EVT_NODE_FAILED; +import static org.apache.ignite.events.EventType.EVT_NODE_LEFT; +import static org.apache.ignite.internal.IgniteFeatures.PERSISTENCE_CACHE_SNAPSHOT; +import static org.apache.ignite.internal.IgniteFeatures.nodeSupports; +import static org.apache.ignite.internal.MarshallerContextImpl.saveMappings; +import static org.apache.ignite.internal.events.DiscoveryCustomEvent.EVT_DISCOVERY_CUSTOM_EVT; +import static org.apache.ignite.internal.managers.communication.GridIoPolicy.SYSTEM_POOL; +import static org.apache.ignite.internal.pagemem.PageIdAllocator.INDEX_PARTITION; +import static org.apache.ignite.internal.pagemem.PageIdAllocator.MAX_PARTITION_ID; +import static org.apache.ignite.internal.processors.cache.persistence.file.FilePageStoreManager.INDEX_FILE_NAME; +import static org.apache.ignite.internal.processors.cache.persistence.file.FilePageStoreManager.PART_FILE_TEMPLATE; +import static org.apache.ignite.internal.processors.cache.persistence.file.FilePageStoreManager.getPartitionFile; +import static org.apache.ignite.internal.processors.cache.persistence.file.FilePageStoreManager.getPartitionFileName; +import static org.apache.ignite.internal.processors.cache.persistence.filename.PdsConsistentIdProcessor.DB_DEFAULT_FOLDER; +import static org.apache.ignite.internal.processors.cache.persistence.partstate.GroupPartitionId.getFlagByPartId; +import static org.apache.ignite.internal.util.IgniteUtils.isLocalNodeCoordinator; +import static org.apache.ignite.internal.util.distributed.DistributedProcess.DistributedProcessType.END_SNAPSHOT; +import static org.apache.ignite.internal.util.distributed.DistributedProcess.DistributedProcessType.START_SNAPSHOT; + +/** + * Internal implementation of snapshot operations over persistence caches. + * <p> + * There are two major actions available: + * <ul> + * <li>Create snapshot of the whole cluster cache groups by triggering PME to achieve consistency.</li> + * <li>Create local snapshot of requested cache groups and send it to the node which request this operation. + * Cache groups will be transmitted using internal API for transferring files. See {@link TransmissionHandler}.</li> + * </ul> + */ +public class IgniteSnapshotManager extends GridCacheSharedManagerAdapter + implements IgniteSnapshot, PartitionsExchangeAware, MetastorageLifecycleListener { + /** File with delta pages suffix. */ + public static final String DELTA_SUFFIX = ".delta"; + + /** File name template consists of delta pages. */ + public static final String PART_DELTA_TEMPLATE = PART_FILE_TEMPLATE + DELTA_SUFFIX; + + /** File name template for index delta pages. */ + public static final String INDEX_DELTA_NAME = INDEX_FILE_NAME + DELTA_SUFFIX; + + /** Text Reason for checkpoint to start snapshot operation. */ + public static final String CP_SNAPSHOT_REASON = "Checkpoint started to enforce snapshot operation: %s"; + + /** Name prefix for each remote snapshot operation. */ + public static final String RMT_SNAPSHOT_PREFIX = "snapshot_"; + + /** Default snapshot directory for loading remote snapshots. */ + public static final String DFLT_SNAPSHOT_TMP_DIR = "snp"; + + /** Timeout in millisecond for snapshot operations. */ + public static final long DFLT_SNAPSHOT_TIMEOUT = 15_000L; + + /** Snapshot in progress error message. */ + public static final String SNP_IN_PROGRESS_ERR_MSG = "Operation rejected due to the snapshot operation in progress."; + + /** Error message to finalize snapshot tasks. */ + public static final String SNP_NODE_STOPPING_ERR_MSG = "Snapshot has been cancelled due to the local node " + + "is stopping"; + + /** Metastorage key to save currently running snapshot. */ + public static final String SNP_RUNNING_KEY = "snapshot-running"; + + /** Prefix for snapshot threads. */ + private static final String SNAPSHOT_RUNNER_THREAD_PREFIX = "snapshot-runner"; + + /** Total number of thread to perform local snapshot. */ + private static final int SNAPSHOT_THREAD_POOL_SIZE = 4; + + /** Default snapshot topic to receive snapshots from remote node. */ + private static final Object DFLT_INITIAL_SNAPSHOT_TOPIC = GridTopic.TOPIC_SNAPSHOT.topic("rmt_snp"); + + /** File transmission parameter of cache group id. */ + private static final String SNP_GRP_ID_PARAM = "grpId"; + + /** File transmission parameter of cache partition id. */ + private static final String SNP_PART_ID_PARAM = "partId"; + + /** File transmission parameter of node-sender directory path with its consistentId (e.g. db/IgniteNode0). */ + private static final String SNP_DB_NODE_PATH_PARAM = "dbNodePath"; + + /** File transmission parameter of a cache directory with is currently sends its partitions. */ + private static final String SNP_CACHE_DIR_NAME_PARAM = "cacheDirName"; + + /** Snapshot parameter name for a file transmission. */ + private static final String SNP_NAME_PARAM = "snpName"; + + /** Total snapshot files count which receiver should expect to receive. */ + private static final String SNP_PARTITIONS_CNT = "partsCnt"; + + /** + * Local buffer to perform copy-on-write operations with pages for {@code SnapshotFutureTask.PageStoreSerialWriter}s. + * It is important to have only only buffer per thread (instead of creating each buffer per + * each {@code SnapshotFutureTask.PageStoreSerialWriter}) this is redundant and can lead to OOM errors. Direct buffer + * deallocate only when ByteBuffer is garbage collected, but it can get out of off-heap memory before it. + */ + private final ThreadLocal<ByteBuffer> locBuff; + + /** Map of registered cache snapshot processes and their corresponding contexts. */ + private final ConcurrentMap<String, SnapshotFutureTask> locSnpTasks = new ConcurrentHashMap<>(); + + /** Lock to protect the resources is used. */ + private final GridBusyLock busyLock = new GridBusyLock(); + + /** Requested snapshot from remote node. */ + private final AtomicReference<RemoteSnapshotFuture> rmtSnpReq = new AtomicReference<>(); + + /** Mutex used to order cluster snapshot operation progress. */ + private final Object snpOpMux = new Object(); + + /** Take snapshot operation procedure. */ + private final DistributedProcess<SnapshotOperationRequest, SnapshotOperationResponse> startSnpProc; + + /** Check previously preformed snapshot operation and delete uncompleted files if need. */ + private final DistributedProcess<SnapshotOperationRequest, SnapshotOperationResponse> endSnpProc; + + /** Resolved persistent data storage settings. */ + private volatile PdsFolderSettings pdsSettings; + + /** Fully initialized metastorage. */ + private volatile ReadWriteMetastorage metaStorage; + + /** Local snapshot sender factory. */ + private Function<String, SnapshotSender> locSndrFactory = this::localSnapshotSender; + + /** Main snapshot directory to save created snapshots. */ + private volatile File locSnpDir; + + /** + * Working directory for loaded snapshots from the remote nodes and storing + * temporary partition delta-files of locally started snapshot process. + */ + private File tmpWorkDir; + + /** Factory to working with delta as file storage. */ + private volatile FileIOFactory ioFactory = new RandomAccessFileIOFactory(); + + /** Factory to create page store for restore. */ + private volatile BiFunction<Integer, Boolean, FilePageStoreFactory> storeFactory; + + /** Snapshot thread pool to perform local partition snapshots. */ + private ExecutorService snpRunner; + + /** System discovery message listener. */ + private DiscoveryEventListener discoLsnr; + + /** Cluster snapshot operation requested by user. */ + private GridFutureAdapter<Void> clusterSnpFut; + + /** Current snapshot operation on local node. */ + private volatile SnapshotOperationRequest clusterSnpRq; + + /** {@code true} if recovery process occurred for snapshot. */ + private volatile boolean recovered; + + /** + * @param ctx Kernal context. + */ + public IgniteSnapshotManager(GridKernalContext ctx) { + locBuff = ThreadLocal.withInitial(() -> + ByteBuffer.allocateDirect(ctx.config().getDataStorageConfiguration().getPageSize()) + .order(ByteOrder.nativeOrder())); + + startSnpProc = new DistributedProcess<>(ctx, START_SNAPSHOT, this::startLocalSnapshot, + this::startLocalSnapshotResult); + + endSnpProc = new DistributedProcess<>(ctx, END_SNAPSHOT, this::endLocalSnapshot, + this::endLocalSnapshotResult); + } + + /** + * @param snapshotCacheDir Snapshot directory to store files. + * @param partId Cache partition identifier. + * @return A file representation. + */ + public static File partDeltaFile(File snapshotCacheDir, int partId) { + return new File(snapshotCacheDir, partDeltaFileName(partId)); + } + + /** + * @param partId Partition id. + * @return File name of delta partition pages. + */ + public static String partDeltaFileName(int partId) { + assert partId <= MAX_PARTITION_ID || partId == INDEX_PARTITION; + + return partId == INDEX_PARTITION ? INDEX_DELTA_NAME : String.format(PART_DELTA_TEMPLATE, partId); + } + + /** {@inheritDoc} */ + @Override protected void start0() throws IgniteCheckedException { + super.start0(); + + GridKernalContext ctx = cctx.kernalContext(); + + if (ctx.clientNode()) + return; + + if (!CU.isPersistenceEnabled(ctx.config())) + return; + + snpRunner = new IgniteThreadPoolExecutor(SNAPSHOT_RUNNER_THREAD_PREFIX, + cctx.igniteInstanceName(), + SNAPSHOT_THREAD_POOL_SIZE, + SNAPSHOT_THREAD_POOL_SIZE, + IgniteConfiguration.DFLT_THREAD_KEEP_ALIVE_TIME, + new LinkedBlockingQueue<>(), + SYSTEM_POOL, + new OomExceptionHandler(ctx)); + + assert cctx.pageStore() instanceof FilePageStoreManager; + + FilePageStoreManager storeMgr = (FilePageStoreManager)cctx.pageStore(); + + pdsSettings = cctx.kernalContext().pdsFolderResolver().resolveFolders(); + + locSnpDir = snapshotPath(ctx.config()).toFile(); + tmpWorkDir = Paths.get(storeMgr.workDir().getAbsolutePath(), DFLT_SNAPSHOT_TMP_DIR).toFile(); + + U.ensureDirectory(locSnpDir, "snapshot work directory", log); + U.ensureDirectory(tmpWorkDir, "temp directory for snapshot creation", log); + + storeFactory = storeMgr::getPageStoreFactory; + + cctx.exchange().registerExchangeAwareComponent(this); + ctx.internalSubscriptionProcessor().registerMetastorageListener(this); + + // Receive remote snapshots requests. + cctx.gridIO().addMessageListener(DFLT_INITIAL_SNAPSHOT_TOPIC, new GridMessageListener() { + @Override public void onMessage(UUID nodeId, Object msg, byte plc) { + if (!busyLock.enterBusy()) + return; + + try { + if (msg instanceof SnapshotRequestMessage) { + SnapshotRequestMessage reqMsg0 = (SnapshotRequestMessage)msg; + String snpName = reqMsg0.snapshotName(); + + synchronized (this) { + SnapshotFutureTask task = lastScheduledRemoteSnapshotTask(nodeId); + + if (task != null) { + // Task will also be removed from local map due to the listener on future done. + task.cancel(); + + log.info("Snapshot request has been cancelled due to another request received " + + "[prevSnpResp=" + task + ", msg0=" + reqMsg0 + ']'); + } + } + + SnapshotFutureTask task = registerSnapshotTask(snpName, + nodeId, + reqMsg0.parts(), + remoteSnapshotSender(snpName, nodeId)); + + task.listen(f -> { + if (f.error() == null) + return; + + U.error(log, "Failed to process request of creating a snapshot " + + "[from=" + nodeId + ", msg=" + reqMsg0 + ']', f.error()); + + try { + cctx.gridIO().sendToCustomTopic(nodeId, + DFLT_INITIAL_SNAPSHOT_TOPIC, + new SnapshotResponseMessage(reqMsg0.snapshotName(), f.error().getMessage()), + SYSTEM_POOL); + } + catch (IgniteCheckedException ex0) { + U.error(log, "Fail to send the response message with processing snapshot request " + + "error [request=" + reqMsg0 + ", nodeId=" + nodeId + ']', ex0); + } + }); + + task.start(); + } + else if (msg instanceof SnapshotResponseMessage) { + SnapshotResponseMessage respMsg0 = (SnapshotResponseMessage)msg; + + RemoteSnapshotFuture fut0 = rmtSnpReq.get(); + + if (fut0 == null || !fut0.snpName.equals(respMsg0.snapshotName())) { + if (log.isInfoEnabled()) { + log.info("A stale snapshot response message has been received. Will be ignored " + + "[fromNodeId=" + nodeId + ", response=" + respMsg0 + ']'); + } + + return; + } + + if (respMsg0.errorMessage() != null) { + fut0.onDone(new IgniteCheckedException("Request cancelled. The snapshot operation stopped " + + "on the remote node with an error: " + respMsg0.errorMessage())); + } + } + } + catch (Throwable e) { + U.error(log, "Processing snapshot request from remote node fails with an error", e); + + cctx.kernalContext().failure().process(new FailureContext(FailureType.CRITICAL_ERROR, e)); + } + finally { + busyLock.leaveBusy(); + } + } + }); + + cctx.gridEvents().addDiscoveryEventListener(discoLsnr = (evt, discoCache) -> { + if (!busyLock.enterBusy()) + return; + + try { + UUID leftNodeId = evt.eventNode().id(); + + if (evt.type() == EVT_DISCOVERY_CUSTOM_EVT) { + DiscoveryCustomEvent evt0 = (DiscoveryCustomEvent)evt; + + if (evt0.customMessage() instanceof InitMessage) { + InitMessage<?> msg = (InitMessage<?>)evt0.customMessage(); + + // This happens when #takeSnapshot() method already invoked and distributed process + // starts its action. + if (msg.type() == START_SNAPSHOT.ordinal()) { + assert clusterSnpRq != null || + !CU.baselineNode(cctx.localNode(), cctx.kernalContext().state().clusterState()) : evt; + + DiscoveryCustomEvent customEvt = new DiscoveryCustomEvent(); + + customEvt.node(evt0.node()); + customEvt.eventNode(evt0.eventNode()); + customEvt.affinityTopologyVersion(evt0.affinityTopologyVersion()); + customEvt.customMessage(new SnapshotStartDiscoveryMessage(discoCache, msg.processId())); + + // Handle new event inside discovery thread, so no guarantees will be violated. + cctx.exchange().onDiscoveryEvent(customEvt, discoCache); + } + } + } + else if (evt.type() == EVT_NODE_LEFT || evt.type() == EVT_NODE_FAILED) { + SnapshotOperationRequest snpRq = clusterSnpRq; + + for (SnapshotFutureTask sctx : locSnpTasks.values()) { + if (sctx.sourceNodeId().equals(leftNodeId) || + (snpRq != null && + snpRq.snpName.equals(sctx.snapshotName()) && + snpRq.bltNodes.contains(leftNodeId))) { + sctx.acceptException(new ClusterTopologyCheckedException("The node which requested snapshot " + + "creation has left the grid")); + } + } + + RemoteSnapshotFuture snpTrFut = rmtSnpReq.get(); + + if (snpTrFut != null && snpTrFut.rmtNodeId.equals(leftNodeId)) { + snpTrFut.onDone(new ClusterTopologyCheckedException("The node from which a snapshot has been " + + "requested left the grid")); + } + } + } + finally { + busyLock.leaveBusy(); + } + }, EVT_NODE_LEFT, EVT_NODE_FAILED, EVT_DISCOVERY_CUSTOM_EVT); + + // Remote snapshot handler. + cctx.kernalContext().io().addTransmissionHandler(DFLT_INITIAL_SNAPSHOT_TOPIC, new TransmissionHandler() { + @Override public void onEnd(UUID nodeId) { + RemoteSnapshotFuture snpTrFut = rmtSnpReq.get(); + + assert snpTrFut.stores.isEmpty() : snpTrFut.stores.entrySet(); + assert snpTrFut.partsLeft == 0 : snpTrFut; + + snpTrFut.onDone(); + + log.info("Requested snapshot from remote node has been fully received " + + "[snpName=" + snpTrFut.snpName + ", snpTrans=" + snpTrFut + ']'); + } + + /** {@inheritDoc} */ + @Override public void onException(UUID nodeId, Throwable err) { + RemoteSnapshotFuture fut = rmtSnpReq.get(); + + if (fut == null) + return; + + if (fut.rmtNodeId.equals(nodeId)) + fut.onDone(err); + } + + /** {@inheritDoc} */ + @Override public String filePath(UUID nodeId, TransmissionMeta fileMeta) { + Integer partId = (Integer)fileMeta.params().get(SNP_PART_ID_PARAM); + String rmtDbNodePath = (String)fileMeta.params().get(SNP_DB_NODE_PATH_PARAM); + String cacheDirName = (String)fileMeta.params().get(SNP_CACHE_DIR_NAME_PARAM); + + RemoteSnapshotFuture transFut = resolve(nodeId, fileMeta); + + try { + File cacheDir = U.resolveWorkDirectory(tmpWorkDir.getAbsolutePath(), + Paths.get(transFut.snpName, rmtDbNodePath, cacheDirName).toString(), + false); + + return new File(cacheDir, getPartitionFileName(partId)).getAbsolutePath(); + } + catch (IgniteCheckedException e) { + throw new IgniteException(e); + } + } + + /** + * @param nodeId Remote node id. + * @param meta Transmission meta. + * @return Resolved transmission future. + */ + private RemoteSnapshotFuture resolve(UUID nodeId, TransmissionMeta meta) { + String snpName = (String)meta.params().get(SNP_NAME_PARAM); + Integer partsCnt = (Integer)meta.params().get(SNP_PARTITIONS_CNT); + + RemoteSnapshotFuture snpTrFut = rmtSnpReq.get(); + + if (snpTrFut == null || !snpTrFut.snpName.equals(snpName)) { + throw new TransmissionCancelledException("Stale snapshot transmission will be ignored " + + "[snpName=" + snpName + ", meta=" + meta + ", snpTrFut=" + snpTrFut + ']'); + } + + assert snpTrFut.snpName.equals(snpName) && snpTrFut.rmtNodeId.equals(nodeId) : + "Another transmission in progress [snpTrFut=" + snpTrFut + ", nodeId=" + snpName + ']'; + + if (snpTrFut.partsLeft == -1) + snpTrFut.partsLeft = partsCnt; + + return snpTrFut; + } + + /** + * @param snpTrans Current snapshot transmission. + * @param grpPartId Pair of group id and its partition id. + */ + private void finishRecover(RemoteSnapshotFuture snpTrans, GroupPartitionId grpPartId) { + FilePageStore pageStore = null; + + try { + pageStore = snpTrans.stores.remove(grpPartId); + + pageStore.finishRecover(); + + snpTrans.partConsumer.accept(new File(pageStore.getFileAbsolutePath()), grpPartId); + + snpTrans.partsLeft--; + } + catch (StorageException e) { + throw new IgniteException(e); + } + finally { + U.closeQuiet(pageStore); + } + } + + /** {@inheritDoc} */ + @Override public Consumer<ByteBuffer> chunkHandler(UUID nodeId, TransmissionMeta initMeta) { + Integer grpId = (Integer)initMeta.params().get(SNP_GRP_ID_PARAM); + Integer partId = (Integer)initMeta.params().get(SNP_PART_ID_PARAM); + + RemoteSnapshotFuture snpTrFut = resolve(nodeId, initMeta); + + GroupPartitionId grpPartId = new GroupPartitionId(grpId, partId); + FilePageStore pageStore = snpTrFut.stores.get(grpPartId); + + if (pageStore == null) { + throw new IgniteException("Partition must be loaded before applying snapshot delta pages " + + "[snpName=" + snpTrFut.snpName + ", grpId=" + grpId + ", partId=" + partId + ']'); + } + + pageStore.beginRecover(); + + // No snapshot delta pages received. Finalize recovery. + if (initMeta.count() == 0) + finishRecover(snpTrFut, grpPartId); + + return new Consumer<ByteBuffer>() { + final LongAdder transferred = new LongAdder(); + + @Override public void accept(ByteBuffer buff) { + try { + assert initMeta.count() != 0 : initMeta; + + RemoteSnapshotFuture fut0 = rmtSnpReq.get(); + + if (fut0 == null || !fut0.equals(snpTrFut) || fut0.isCancelled()) { + throw new TransmissionCancelledException("Snapshot request is cancelled " + + "[snpName=" + snpTrFut.snpName + ", grpId=" + grpId + ", partId=" + partId + ']'); + } + + pageStore.write(PageIO.getPageId(buff), buff, 0, false); + + transferred.add(buff.capacity()); + + if (transferred.longValue() == initMeta.count()) + finishRecover(snpTrFut, grpPartId); + } + catch (IgniteCheckedException e) { + throw new IgniteException(e); + } + } + }; + } + + /** {@inheritDoc} */ + @Override public Consumer<File> fileHandler(UUID nodeId, TransmissionMeta initMeta) { + Integer grpId = (Integer)initMeta.params().get(SNP_GRP_ID_PARAM); + Integer partId = (Integer)initMeta.params().get(SNP_PART_ID_PARAM); + String snpName = (String)initMeta.params().get(SNP_NAME_PARAM); + + assert grpId != null; + assert partId != null; + assert snpName != null; + assert storeFactory != null; + + RemoteSnapshotFuture transFut = rmtSnpReq.get(); + + if (transFut == null) { + throw new IgniteException("Snapshot transmission with given name doesn't exists " + + "[snpName=" + snpName + ", grpId=" + grpId + ", partId=" + partId + ']'); + } + + return new Consumer<File>() { + @Override public void accept(File file) { + RemoteSnapshotFuture fut0 = rmtSnpReq.get(); + + if (fut0 == null || !fut0.equals(transFut) || fut0.isCancelled()) { + throw new TransmissionCancelledException("Snapshot request is cancelled [snpName=" + snpName + + ", grpId=" + grpId + ", partId=" + partId + ']'); + } + + busyLock.enterBusy(); + + try { + FilePageStore pageStore = (FilePageStore)storeFactory + .apply(grpId, false) + .createPageStore(getFlagByPartId(partId), + file::toPath, + new LongAdderMetric("NO_OP", null)); + + transFut.stores.put(new GroupPartitionId(grpId, partId), pageStore); + + pageStore.init(); + } + catch (IgniteCheckedException e) { + throw new IgniteException(e); + } + finally { + busyLock.leaveBusy(); + } + } + }; + } + }); + } + + /** {@inheritDoc} */ + @Override protected void stop0(boolean cancel) { + busyLock.block(); + + try { + // Try stop all snapshot processing if not yet. + for (SnapshotFutureTask sctx : locSnpTasks.values()) + sctx.acceptException(new NodeStoppingException(SNP_NODE_STOPPING_ERR_MSG)); + + locSnpTasks.clear(); + + RemoteSnapshotFuture snpTrFut = rmtSnpReq.get(); + + if (snpTrFut != null) + snpTrFut.cancel(); + + synchronized (snpOpMux) { + if (clusterSnpFut != null) { + clusterSnpFut.onDone(new NodeStoppingException(SNP_NODE_STOPPING_ERR_MSG)); + + clusterSnpFut = null; + } + } + + if (snpRunner != null) + snpRunner.shutdownNow(); + + cctx.kernalContext().io().removeMessageListener(DFLT_INITIAL_SNAPSHOT_TOPIC); + cctx.kernalContext().io().removeTransmissionHandler(DFLT_INITIAL_SNAPSHOT_TOPIC); + + if (discoLsnr != null) + cctx.kernalContext().event().removeDiscoveryEventListener(discoLsnr); + + cctx.exchange().unregisterExchangeAwareComponent(this); + } + finally { + busyLock.unblock(); + } + } + + /** + * Concurrently traverse the snapshot directory for given local node folder name and Review comment: See no concurrency here ---------------------------------------------------------------- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services