Vladsz83 commented on code in PR #11897: URL: https://github.com/apache/ignite/pull/11897#discussion_r2230219611
########## modules/core/src/test/java/org/apache/ignite/internal/processors/cache/persistence/snapshot/IgniteClusterSnapshotCheckTest.java: ########## @@ -584,6 +634,665 @@ public void testClusterSnapshotCheckMultipleTimes() throws Exception { assertTrue("Threads created: " + createdThreads, createdThreads < iterations); } + /** Tests that concurrent snapshot full checks are declined for the same snapshot. */ + @Test + public void testConcurrentTheSameSnpFullChecksDeclined() throws Exception { + // 0 - coordinator; 0,1,2 - baselines; 3 - non-baseline; 4,5 - clients. + prepareGridsAndSnapshot(4, 3, 2, false); + + for (int i = 0; i < G.allGrids().size(); ++i) { + for (int j = 1; j < G.allGrids().size() - 1; ++j) { + int i0 = i; + int j0 = j; + + doTestConcurrentSnpCheckOperations( + () -> new IgniteFutureImpl<>(snp(grid(i0)).checkSnapshot(SNAPSHOT_NAME, null)), + () -> new IgniteFutureImpl<>(snp(grid(j0)).checkSnapshot(SNAPSHOT_NAME, null)), + CHECK_SNAPSHOT_METAS, + CHECK_SNAPSHOT_PARTS, + true, + false, + null, + null + ); + } + } + } + + /** Tests that concurrent full checks of normal and incremental the same snapshot are declined . */ + @Test + public void testConcurrentTheSameSnpFullAndIncrementalChecksDeclined() throws Exception { + assumeFalse(encryption); + + // 0 - coordinator; 0,1 - baselines; 2 - non-baseline; 3,4 - clients. + prepareGridsAndSnapshot(3, 2, 2, false); + + snp(grid(3)).createIncrementalSnapshot(SNAPSHOT_NAME).get(); + + for (int i = 0; i < G.allGrids().size(); ++i) { + for (int j = 1; j < G.allGrids().size() - 1; ++j) { + int i0 = i; + int j0 = j; + + doTestConcurrentSnpCheckOperations( + () -> new IgniteFutureImpl<>(snp(grid(i0)).checkSnapshot(SNAPSHOT_NAME, null)), + () -> new IgniteFutureImpl<>(snp(grid(j0)).checkSnapshot(SNAPSHOT_NAME, null, 1)), + CHECK_SNAPSHOT_METAS, + CHECK_SNAPSHOT_PARTS, + true, + false, + null, + null + ); + } + } + } + + /** Tests that concurrent full checks of the same incremental snapshot are declined. */ + @Test + public void testConcurrentTheSameIncrementalFullChecksDeclined() throws Exception { + assumeFalse(encryption); + + // 0 - coordinator; 0,1 - baselines; 2 - non-baseline; 3,4 - clients. + prepareGridsAndSnapshot(3, 2, 2, false); + + snp(grid(3)).createIncrementalSnapshot(SNAPSHOT_NAME).get(); + + for (int i = 0; i < G.allGrids().size(); ++i) { + for (int j = 1; j < G.allGrids().size() - 1; ++j) { + int i0 = i; + int j0 = j; + + doTestConcurrentSnpCheckOperations( + () -> new IgniteFutureImpl<>(snp(grid(i0)).checkSnapshot(SNAPSHOT_NAME, null, 1)), + () -> new IgniteFutureImpl<>(snp(grid(j0)).checkSnapshot(SNAPSHOT_NAME, null, 1)), + CHECK_SNAPSHOT_METAS, + CHECK_SNAPSHOT_PARTS, + true, + false, + null, + null + ); + } + } + } + + /** Tests that concurrent checks of different incremental snapshots are declined. */ + @Test + public void testConcurrentDifferentIncrementalFullChecksDeclined() throws Exception { + assumeFalse(encryption); + + // 0 - coordinator; 0,1 - baselines; 2 - non-baseline; 3,4 - clients. + prepareGridsAndSnapshot(3, 2, 2, false); + + snp(grid(3)).createIncrementalSnapshot(SNAPSHOT_NAME).get(); + snp(grid(3)).createIncrementalSnapshot(SNAPSHOT_NAME).get(); + + for (int i = 0; i < G.allGrids().size(); ++i) { + for (int j = 1; j < G.allGrids().size() - 1; ++j) { + int i0 = i; + int j0 = j; + + doTestConcurrentSnpCheckOperations( + () -> new IgniteFutureImpl<>(snp(grid(i0)).checkSnapshot(SNAPSHOT_NAME, null, 1)), + () -> new IgniteFutureImpl<>(snp(grid(j0)).checkSnapshot(SNAPSHOT_NAME, null, 2)), + CHECK_SNAPSHOT_METAS, + CHECK_SNAPSHOT_PARTS, + true, + false, + null, + null + ); + } + } + } + + /** Tests that concurrent full restoration of a normal snapshot and check of an incremental are declined. */ + @Test + public void testConcurrentTheSameSnpIncrementalCheckAndFullRestoreDeclined() throws Exception { + assumeFalse(encryption); + + // 0 - coordinator; 0,1 - baselines; 2 - non-baseline; 3,4 - clients. + prepareGridsAndSnapshot(3, 2, 2, false); + + snp(grid(3)).createIncrementalSnapshot(SNAPSHOT_NAME).get(); + + grid(0).destroyCache(DEFAULT_CACHE_NAME); + + awaitPartitionMapExchange(); + + // Snapshot restoration is disallowed from client nodes. + for (int i = 0; i < 3; ++i) { + for (int j = 1; j < G.allGrids().size() - 1; ++j) { + int i0 = i; + int j0 = j; + + doTestConcurrentSnpCheckOperations( + () -> snp(grid(i0)).restoreSnapshot(SNAPSHOT_NAME, null, null, 0, true), + () -> new IgniteFutureImpl<>(snp(grid(j0)).checkSnapshot(SNAPSHOT_NAME, null, 1)), + CHECK_SNAPSHOT_METAS, + CHECK_SNAPSHOT_PARTS, + true, + false, + null, + () -> grid(0).destroyCache(DEFAULT_CACHE_NAME) + ); + } + } + } + + /** Tests that concurrent snapshot full checks are allowed for different snapshots. */ + @Test + public void testConcurrentDifferentSnpFullChecksAllowed() throws Exception { + // 0 - coordinator; 0,1 - baselines; 2 - non-baseline; 3,4 - clients. + prepareGridsAndSnapshot(3, 2, 2, false); + + snp(grid(3)).createSnapshot(SNAPSHOT_NAME + '2').get(); + + for (int i = 0; i < G.allGrids().size(); ++i) { + for (int j = 1; j < G.allGrids().size() - 1; ++j) { + int i0 = i; + int j0 = j; + + doTestConcurrentSnpCheckOperations( + () -> new IgniteFutureImpl<>(snp(grid(i0)).checkSnapshot(SNAPSHOT_NAME, null)), + () -> new IgniteFutureImpl<>(snp(grid(j0)).checkSnapshot(SNAPSHOT_NAME + '2', null)), + CHECK_SNAPSHOT_METAS, + CHECK_SNAPSHOT_PARTS, + false, + true, + null, + null + ); + } + } + } + + /** Tests that concurrent snapshot full check and restoration (without full checking) are allowed for different snapshots. */ + @Test + public void testConcurrentDifferentSnpFullCheckAndRestorationAllowed() throws Exception { + prepareGridsAndSnapshot(3, 2, 2, false); + + snp(grid(3)).createSnapshot(SNAPSHOT_NAME + '2').get(); + + grid(0).destroyCache(DEFAULT_CACHE_NAME); + + awaitPartitionMapExchange(); + + for (int i = 0; i < G.allGrids().size(); ++i) { + // Snapshot restoration is disallowed from client nodes. + for (int j = 1; j < 3; ++j) { + int i0 = i; + int j0 = j; + + doTestConcurrentSnpCheckOperations( + () -> new IgniteFutureImpl<>(snp(grid(i0)).checkSnapshot(SNAPSHOT_NAME, null)), + () -> snp(grid(j0)).restoreSnapshot(SNAPSHOT_NAME + '2', null), + CHECK_SNAPSHOT_PARTS, + RESTORE_CACHE_GROUP_SNAPSHOT_START, + false, + false, + null, + () -> grid(0).destroyCache(DEFAULT_CACHE_NAME) + ); + } + } + } + + /** Tests concurrent snapshot full check and full restoration (with checking) are allowed for different snapshots. */ + @Test + public void testConcurrentDifferentSnpCheckAndFullRestorationAllowed() throws Exception { + prepareGridsAndSnapshot(3, 2, 2, false); + + snp(grid(0)).createSnapshot(SNAPSHOT_NAME + '2').get(); + + grid(0).destroyCache(DEFAULT_CACHE_NAME); + + awaitPartitionMapExchange(); + + doTestConcurrentSnpCheckOperations( + () -> new IgniteFutureImpl<>(snp(grid(0)).checkSnapshot(SNAPSHOT_NAME, null)), + () -> snp(grid(1)).restoreSnapshot(SNAPSHOT_NAME + '2', null, null, 0, true), + CHECK_SNAPSHOT_METAS, + CHECK_SNAPSHOT_PARTS, + false, + true, + null, + () -> grid(0).destroyCache(DEFAULT_CACHE_NAME) + ); + } + + /** Tests that concurrent snapshot full restoration (with checking) is declined when the same snapshot is being fully checked. */ + @Test + public void testConcurrentFullCheckAndFullRestoreDeclined() throws Exception { + prepareGridsAndSnapshot(3, 2, 2, false); + + doTestConcurrentSnpCheckOperations( + () -> new IgniteFutureImpl<>(snp(grid(0)).checkSnapshot(SNAPSHOT_NAME, null)), + () -> snp(grid(1)).restoreSnapshot(SNAPSHOT_NAME, null, null, 0, true), + CHECK_SNAPSHOT_METAS, + CHECK_SNAPSHOT_PARTS, + true, + false, + null, + () -> grid(0).destroyCache(DEFAULT_CACHE_NAME) + ); + } + + /** Tests that concurrent snapshot full check is declined when the same snapshot is being fully restored (checked). */ + @Test + public void testConcurrentTheSameSnpFullCheckWhenFullyRestoringDeclined() throws Exception { + prepareGridsAndSnapshot(3, 2, 2, true); + + doTestConcurrentSnpCheckOperations( + () -> snp(grid(0)).restoreSnapshot(SNAPSHOT_NAME, null, null, 0, true), + () -> new IgniteFutureImpl<>(snp(grid(1)).checkSnapshot(SNAPSHOT_NAME, null)), + CHECK_SNAPSHOT_METAS, + CHECK_SNAPSHOT_PARTS, + true, + false, + null, + () -> grid(0).destroyCache(DEFAULT_CACHE_NAME) + ); + } + + /** Tests that concurrent full check and restoration (without full checking) of the same snapshot are declined. */ + @Test + public void testConcurrentTheSameSnpFullCheckAndRestoreDeclined() throws Exception { + prepareGridsAndSnapshot(3, 2, 2, true); + + doTestConcurrentSnpCheckOperations( + () -> new IgniteFutureImpl<>(snp(grid(0)).checkSnapshot(SNAPSHOT_NAME, null)), + () -> snp(grid(1)).restoreSnapshot(SNAPSHOT_NAME, null), + CHECK_SNAPSHOT_METAS, + CHECK_SNAPSHOT_METAS, + true, + false, + null, + () -> grid(0).destroyCache(DEFAULT_CACHE_NAME) + ); + } + + /** Tests that snapshot full check doesn't affect a snapshot creation. */ + @Test + public void testConcurrentDifferentSnpCheckAndCreateAllowed() throws Exception { + prepareGridsAndSnapshot(3, 2, 2, false); + + File dir = snapshotFileTree(grid(0), SNAPSHOT_NAME + "_2").root(); + + doTestConcurrentSnpCheckOperations( + () -> new IgniteFutureImpl<>(snp(grid(0)).checkSnapshot(SNAPSHOT_NAME, null)), + () -> snp(grid(1)).createSnapshot(SNAPSHOT_NAME + "_2", null, false, false), + CHECK_SNAPSHOT_METAS, + null, + false, + false, + () -> U.delete(dir), + () -> U.delete(dir) + ); + } + + /** Tests snapshot checking processes a baseline node leave. */ + @Test + public void testBaselineLeavesDuringSnapshotChecking() throws Exception { + prepareGridsAndSnapshot(6, 5, 1, false); + + Set<Integer> stopped = new HashSet<>(); + + // Snapshot checking started from the coordinator. + doTestNodeStopsDuringSnapshotChecking(0, 4, stopped); + + // Snapshot checking started from non-baseline. + doTestNodeStopsDuringSnapshotChecking(5, 3, stopped); + + // Snapshot checking started from a client. + doTestNodeStopsDuringSnapshotChecking(5, 2, stopped); + + // The same baseline leaves. + doTestNodeStopsDuringSnapshotChecking(1, 1, stopped); + } + + /** Tests snapshot checking processes a client node leave. */ + @Test + public void testClientLeavesDuringSnapshotChecking() throws Exception { + prepareGridsAndSnapshot(3, 2, 6, false); + + Set<Integer> stopped = new HashSet<>(); + + // Snapshot checking started from a baseline. + doTestNodeStopsDuringSnapshotChecking(1, 8, stopped); + + // Snapshot checking started from a non-baseline. + doTestNodeStopsDuringSnapshotChecking(2, 7, stopped); + + // Snapshot checking started from the coordinator. + doTestNodeStopsDuringSnapshotChecking(0, 6, stopped); + + // Snapshot checking started from other client. + doTestNodeStopsDuringSnapshotChecking(4, 5, stopped); + + // Snapshot checking started from the same client. + doTestNodeStopsDuringSnapshotChecking(4, 4, stopped); + } + + /** Tests snapshot checking processes a non-baseline node leave. */ + @Test + public void testNonBaselineServerLeavesDuringSnapshotChecking() throws Exception { + prepareGridsAndSnapshot(7, 2, 1, false); + + Set<Integer> stopped = new HashSet<>(); + + // Snapshot checking started from a sever node. + doTestNodeStopsDuringSnapshotChecking(1, 6, stopped); + + // Snapshot checking started from a client node. + doTestNodeStopsDuringSnapshotChecking(7, 5, stopped); + + // Snapshot checking started from another non-baseline. + doTestNodeStopsDuringSnapshotChecking(3, 4, stopped); + + // Snapshot checking started from coordinator. + doTestNodeStopsDuringSnapshotChecking(0, 3, stopped); + + // Snapshot checking started from the same non-baseline. + doTestNodeStopsDuringSnapshotChecking(2, 2, stopped); + } + + /** Tests snapshot checking process continues when a new baseline node leaves. */ + @Test + public void testNewBaselineServerLeavesDuringSnapshotChecking() throws Exception { + prepareGridsAndSnapshot(3, 2, 1, false); + + int grids = G.allGrids().size(); + + discoSpi(grid(0)).block(msg -> msg instanceof FullMessage && ((FullMessage<?>)msg).type() == CHECK_SNAPSHOT_METAS.ordinal()); + + IgniteInternalFuture<?> fut = snp(grid(3)).checkSnapshot(SNAPSHOT_NAME, null, null, false, 0, true); + + discoSpi(grid(0)).waitBlocked(getTestTimeout()); + + grid(0).cluster().setBaselineTopology(Stream.of(grid(0).localNode(), grid(1).localNode(), grid(2).localNode()) + .collect(Collectors.toList())); + + stopGrid(2); + + assertTrue(waitForCondition(() -> { + for (int i = 0; i < grids; ++i) { + if (i != 2 && grid(i).cluster().nodes().size() != grids - 1) + return false; + } + + return true; + }, getTestTimeout())); + + discoSpi(grid(0)).unblock(); + + fut.get(getTestTimeout()); + } + + /** Tests snapshot checking process stops when the coorditator leaves. */ + @Test + public void testCoordinatorLeavesDuringSnapshotChecking() throws Exception { + prepareGridsAndSnapshot(5, 4, 1, false); + + Set<Integer> stopped = new HashSet<>(); + + // Coordinator leaves when snapshot started from a server node. + doTestNodeStopsDuringSnapshotChecking(4, 0, stopped); + + // Coordinator leaves when snapshot started from a client node. + assertTrue(U.isLocalNodeCoordinator(grid(1).context().discovery())); + + doTestNodeStopsDuringSnapshotChecking(5, 1, stopped); + + // Coordinator leaves when snapshot started from it. + assertTrue(U.isLocalNodeCoordinator(grid(2).context().discovery())); + + doTestNodeStopsDuringSnapshotChecking(2, 2, stopped); + } + + /** */ + private void prepareGridsAndSnapshot(int servers, int baseLineCnt, int clients, boolean removeTheCache) throws Exception { + assert baseLineCnt > 0 && baseLineCnt <= servers; + + IgniteEx ignite = null; + + for (int i = 0; i < servers + clients; ++i) { + IgniteConfiguration cfg = getConfiguration(getTestIgniteInstanceName(i)); + + cfg.setDiscoverySpi(new BlockingCustomMessageDiscoverySpi()); + + if (i >= servers) + cfg.setClientMode(true); + + ignite = startGrid(cfg); + + if (i == baseLineCnt - 1) { + ignite.cluster().state(ACTIVE); + + ignite.cluster().setBaselineTopology(ignite.cluster().topologyVersion()); + } + } + + try (IgniteDataStreamer<Integer, Integer> ds = grid(0).dataStreamer(DEFAULT_CACHE_NAME)) { + for (int i = 0; i < 100; ++i) + ds.addData(i, i); + } + + ignite.snapshot().createSnapshot(SNAPSHOT_NAME).get(); + + if (removeTheCache) + ignite.destroyCache(DEFAULT_CACHE_NAME); + } + + /** + * Tests concurrent snapshot operations related to the snapshot checking. + * + * @param originatorOp First snapshot operation on an originator node. + * @param trierOp Second concurrent snapshot operation on a trier node. + * @param firstDelay First distributed process full message of {@code originatorOp} to delay on the coordinator + * to launch {@code trierOp}. + * @param secondDelay Second distributed process full message of {@code originatorOp} to delay on the coordinator + * to launch {@code trierOp} again. + * @param expectFailure If {@code true}, the 'snapshot-check-is-in-progress' error is excepted during excution of + * {@code trierOp}. Otherwise, {@code trierOp} must successfully finish. + * @param waitForBothFirstDelays If {@code true}, {@code firstDelay} are awaited for both concurrend operations before proceed. + * @param step2preparation If not {@code null}, is executed before delaying and waiting for {@code secondDelay}. + * @param cleaner If not {@code null}, is executed at the end. + */ + private void doTestConcurrentSnpCheckOperations( + Supplier<IgniteFuture<?>> originatorOp, + Supplier<IgniteFuture<?>> trierOp, + DistributedProcess.DistributedProcessType firstDelay, + @Nullable DistributedProcess.DistributedProcessType secondDelay, + boolean expectFailure, + boolean waitForBothFirstDelays, + @Nullable Runnable step2preparation, + @Nullable Runnable cleaner + ) throws Exception { + try { + AtomicBoolean firstDelayed = new AtomicBoolean(); + + // Block any matching if the operation is the same. Otherwise, block only first. + discoSpi(grid(0)).block( + msg -> msg instanceof FullMessage + && ((FullMessage<?>)msg).type() == firstDelay.ordinal() + && (waitForBothFirstDelays || firstDelayed.compareAndSet(false, true))); + + IgniteFuture<?> fut = originatorOp.get(); + + discoSpi(grid(0)).waitBlocked(getTestTimeout()); + + IgniteFuture<?> fut2 = trierOp.get(); + + if (expectFailure) { + assertThrowsAnyCause( + log, + fut2::get, + IllegalStateException.class, + "Validation of snapshot '" + SNAPSHOT_NAME + "' has already started" + ); + + if (secondDelay == null) { + discoSpi(grid(0)).unblock(); + + fut.get(getTestTimeout()); + + return; + } + + discoSpi(grid(0)).blockNextAndRelease(msg -> msg instanceof FullMessage + && ((FullMessage<?>)msg).type() == secondDelay.ordinal()); + + discoSpi(grid(0)).waitBlocked(getTestTimeout()); + + if (step2preparation != null) + step2preparation.run(); + + assertThrowsAnyCause( + log, + fut2::get, + IllegalStateException.class, + "Validation of snapshot '" + SNAPSHOT_NAME + "' has already started" + ); + + discoSpi(grid(0)).unblock(); + + fut.get(getTestTimeout()); + } + else { + if (waitForBothFirstDelays) { + discoSpi(grid(0)).waitBlockedSize(2, getTestTimeout()); + + if (secondDelay != null) { + discoSpi(grid(0)).blockNextAndRelease(msg -> msg instanceof FullMessage + && ((FullMessage<?>)msg).type() == secondDelay.ordinal()); + + discoSpi(grid(0)).waitBlockedSize(2, getTestTimeout()); + } + } + else { + if (secondDelay != null) { + discoSpi(grid(0)).blockNextAndRelease(msg -> msg instanceof FullMessage + && ((FullMessage<?>)msg).type() == secondDelay.ordinal()); + + discoSpi(grid(0)).waitBlocked(getTestTimeout()); + } + else + fut2.get(); + } + + discoSpi(grid(0)).unblock(); + + fut2.get(); + + fut.get(); + } + } + finally { + discoSpi(grid(0)).unblock(); + + if (cleaner != null) + cleaner.run(); + + awaitPartitionMapExchange(); + } + } + + /** */ + private void doTestNodeStopsDuringSnapshotChecking(int originatorIdx, int nodeToStopIdx, Set<Integer> stopped) throws Exception { + int grids = G.allGrids().size(); + + ClusterNode leaving = grid(nodeToStopIdx).cluster().localNode(); + + boolean requredLeft = originatorIdx == nodeToStopIdx || grid(nodeToStopIdx).cluster().currentBaselineTopology().stream() + .anyMatch(bl -> bl.consistentId().equals(leaving.consistentId())); + + int coordIdx = -1; + + for (int i = 0; i < grids; ++i) { + final IgniteEx g = grid(i); + + // Wait for all nodes complete checking. + waitForCondition(() -> !snp(g).isSnapshotChecking(SNAPSHOT_NAME), 10_000, 50); Review Comment: I think default check period 200ms is OK. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: notifications-unsubscr...@ignite.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org