This is an automated email from the ASF dual-hosted git repository. ivandasch pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/ignite.git
The following commit(s) were added to refs/heads/master by this push: new 8cfa8b046a1 IGNITE-18198 Implement snapshots of caches with disk page compression. (#10430) 8cfa8b046a1 is described below commit 8cfa8b046a1b4782ebca75a506247d4331f89e3c Author: Ivan Daschinskiy <ivanda...@apache.org> AuthorDate: Fri Dec 23 14:07:36 2022 +0300 IGNITE-18198 Implement snapshots of caches with disk page compression. (#10430) --- modules/compress/pom.xml | 14 + .../compress/CompressionProcessorImpl.java | 34 ++ .../snapshot/SnapshotCompressionBasicTest.java | 385 +++++++++++++++++++++ .../DiskPageCompressionIntegrationTest.java | 2 + .../testsuites/IgnitePdsCompressionTestSuite.java | 12 + .../processors/cache/CacheCompressionManager.java | 102 ------ .../processors/cache/CacheGroupContext.java | 15 +- .../processors/cache/GridCacheContext.java | 14 - .../processors/cache/GridCacheProcessor.java | 16 +- .../CachePartitionDefragmentationManager.java | 3 +- .../pagemem/PageReadWriteManagerImpl.java | 9 +- .../snapshot/IgniteSnapshotManager.java | 47 ++- .../persistence/snapshot/SnapshotFutureTask.java | 21 +- .../persistence/snapshot/SnapshotMetadata.java | 35 +- .../snapshot/SnapshotPartitionsVerifyHandler.java | 67 +++- .../snapshot/SnapshotRestoreProcess.java | 23 ++ .../wal/reader/StandaloneGridKernalContext.java | 26 +- .../processors/cache/verify/IdleVerifyUtility.java | 23 ++ .../processors/compress/CompressionHandler.java | 130 +++++++ .../snapshot/AbstractSnapshotSelfTest.java | 20 +- .../snapshot/IgniteClusterSnapshotCheckTest.java | 31 +- .../snapshot/IgniteClusterSnapshotDeltaTest.java | 8 +- .../snapshot/IgniteSnapshotManagerSelfTest.java | 5 +- .../IgniteSnapshotWithMetastorageTest.java | 6 +- .../loadtests/hashmap/GridCacheTestContext.java | 2 - .../junits/multijvm/JavaVersionCommandParser.java | 4 +- .../multijvm/JavaVersionCommandParserTest.java | 6 + .../ignite/testsuites/IgniteSnapshotTestSuite.java | 52 ++- .../IgniteSnapshotWithIndexingTestSuite.java | 30 +- 29 files changed, 960 insertions(+), 182 deletions(-) diff --git a/modules/compress/pom.xml b/modules/compress/pom.xml index 67ac1da1c0d..f9c4db51f42 100644 --- a/modules/compress/pom.xml +++ b/modules/compress/pom.xml @@ -117,6 +117,20 @@ <version>${mockito.version}</version> <scope>test</scope> </dependency> + + <dependency> + <groupId>org.hamcrest</groupId> + <artifactId>hamcrest</artifactId> + <version>${hamcrest.version}</version> + <scope>test</scope> + </dependency> + + <dependency> + <groupId>com.google.guava</groupId> + <artifactId>guava</artifactId> + <version>${guava.version}</version> + <scope>test</scope> + </dependency> </dependencies> <build> diff --git a/modules/compress/src/main/java/org/apache/ignite/internal/processors/compress/CompressionProcessorImpl.java b/modules/compress/src/main/java/org/apache/ignite/internal/processors/compress/CompressionProcessorImpl.java index 6d53e0ef016..623ff50ee3d 100644 --- a/modules/compress/src/main/java/org/apache/ignite/internal/processors/compress/CompressionProcessorImpl.java +++ b/modules/compress/src/main/java/org/apache/ignite/internal/processors/compress/CompressionProcessorImpl.java @@ -17,6 +17,7 @@ package org.apache.ignite.internal.processors.compress; +import java.io.File; import java.io.IOException; import java.nio.ByteBuffer; import java.nio.file.Path; @@ -29,12 +30,15 @@ import org.apache.ignite.IgniteException; import org.apache.ignite.configuration.DiskPageCompression; import org.apache.ignite.internal.GridKernalContext; import org.apache.ignite.internal.pagemem.PageUtils; +import org.apache.ignite.internal.processors.cache.persistence.file.RandomAccessFileIO; import org.apache.ignite.internal.processors.cache.persistence.tree.io.CompactablePageIO; import org.apache.ignite.internal.processors.cache.persistence.tree.io.PageIO; import org.apache.ignite.internal.util.GridUnsafe; import org.apache.ignite.internal.util.typedef.internal.U; import org.xerial.snappy.Snappy; +import static java.nio.file.StandardOpenOption.CREATE; +import static java.nio.file.StandardOpenOption.WRITE; import static org.apache.ignite.configuration.DataStorageConfiguration.MAX_PAGE_SIZE; import static org.apache.ignite.configuration.DiskPageCompression.SKIP_GARBAGE; import static org.apache.ignite.internal.util.GridUnsafe.NATIVE_BYTE_ORDER; @@ -90,6 +94,8 @@ public class CompressionProcessorImpl extends CompressionProcessor { "must be at least 2 times larger than the underlying storage block size (detected to be " + fsBlockSize + " bytes at '" + storagePath + "') for page compression."); } + + checkPunchHole(storagePath, fsBlockSize); } /** {@inheritDoc} */ @@ -170,6 +176,34 @@ public class CompressionProcessorImpl extends CompressionProcessor { return compactPage; } + /** Check if filesystem actually supports punching holes. */ + private void checkPunchHole(Path storagePath, int fsBlockSz) throws IgniteException { + ByteBuffer buffer = null; + File testFile = null; + try { + testFile = File.createTempFile("punch_hole_", null, storagePath.toFile()); + + buffer = GridUnsafe.allocateBuffer(fsBlockSz * 2); + GridUnsafe.zeroMemory(GridUnsafe.bufferAddress(buffer), buffer.capacity()); + + try (RandomAccessFileIO testFileIO = new RandomAccessFileIO(testFile, CREATE, WRITE)) { + testFileIO.writeFully(buffer); + + testFileIO.punchHole(fsBlockSz, fsBlockSz); + } + } + catch (Exception e) { + throw new IgniteException("File system does not support punching holes on path " + storagePath, e); + } + finally { + if (buffer != null) + GridUnsafe.freeBuffer(buffer); + + if (testFile != null) + testFile.delete(); + } + } + /** * @param page Page. * @param compactSize Compacted page size. diff --git a/modules/compress/src/test/java/org/apache/ignite/internal/processors/cache/persistence/snapshot/SnapshotCompressionBasicTest.java b/modules/compress/src/test/java/org/apache/ignite/internal/processors/cache/persistence/snapshot/SnapshotCompressionBasicTest.java new file mode 100644 index 00000000000..acfb53deb1a --- /dev/null +++ b/modules/compress/src/test/java/org/apache/ignite/internal/processors/cache/persistence/snapshot/SnapshotCompressionBasicTest.java @@ -0,0 +1,385 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ignite.internal.processors.cache.persistence.snapshot; + +import java.io.IOException; +import java.nio.file.Files; +import java.nio.file.Path; +import java.nio.file.Paths; +import java.nio.file.StandardOpenOption; +import java.util.Arrays; +import java.util.Collection; +import java.util.Collections; +import java.util.HashMap; +import java.util.HashSet; +import java.util.Map; +import java.util.Objects; +import java.util.Set; +import java.util.function.Function; +import java.util.stream.Collectors; +import java.util.stream.Stream; +import org.apache.ignite.Ignite; +import org.apache.ignite.IgniteCache; +import org.apache.ignite.IgniteCheckedException; +import org.apache.ignite.IgniteException; +import org.apache.ignite.cache.QueryEntity; +import org.apache.ignite.cache.QueryIndex; +import org.apache.ignite.cluster.ClusterState; +import org.apache.ignite.configuration.CacheConfiguration; +import org.apache.ignite.configuration.DiskPageCompression; +import org.apache.ignite.configuration.IgniteConfiguration; +import org.apache.ignite.internal.GridKernalContextImpl; +import org.apache.ignite.internal.IgniteEx; +import org.apache.ignite.internal.processors.cache.persistence.file.FileIO; +import org.apache.ignite.internal.processors.cache.persistence.file.RandomAccessFileIO; +import org.apache.ignite.internal.processors.cache.verify.IdleVerifyResultV2; +import org.apache.ignite.internal.processors.compress.CompressionProcessor; +import org.apache.ignite.internal.util.typedef.F; +import org.apache.ignite.internal.util.typedef.G; +import org.apache.ignite.internal.util.typedef.internal.U; +import org.apache.ignite.testframework.GridTestUtils; +import org.junit.After; +import org.junit.Before; +import org.junit.Test; +import org.junit.runners.Parameterized; +import org.mockito.Mockito; + +import static org.apache.ignite.configuration.IgniteConfiguration.DFLT_SNAPSHOT_DIRECTORY; +import static org.apache.ignite.events.EventType.EVTS_CLUSTER_SNAPSHOT; +import static org.apache.ignite.events.EventType.EVT_CLUSTER_SNAPSHOT_RESTORE_FINISHED; +import static org.apache.ignite.events.EventType.EVT_CLUSTER_SNAPSHOT_RESTORE_STARTED; + +/** */ +public class SnapshotCompressionBasicTest extends AbstractSnapshotSelfTest { + /** */ + protected static final DiskPageCompression DISK_PAGE_COMPRESSION = DiskPageCompression.SNAPPY; + + /** */ + protected static final int PAGE_SIZE = 8 * 1024; + + /** */ + protected static final String SNAPSHOT_WITHOUT_HOLES = "testSnapshotWithoutHoles"; + + /** */ + protected static final String SNAPSHOT_WITH_HOLES = "testSnapshotWithHoles"; + + /** */ + protected static final long TIMEOUT = 30_000; + + /** */ + private static final Map<String, String> CACHES = new HashMap<>(); + + static { + CACHES.put("cache1", "group1"); + CACHES.put("cache2", "group1"); + CACHES.put("cache3", null); + CACHES.put("cache4", null); + } + + /** */ + private static final Set<String> COMPRESSED_CACHES = new HashSet<>(); + + static { + COMPRESSED_CACHES.add("cache1"); + COMPRESSED_CACHES.add("cache2"); + COMPRESSED_CACHES.add("cache3"); + } + + /** */ + @Parameterized.Parameters(name = "Encryption={0}") + public static Collection<Boolean> encryptionParams() { + return Collections.singletonList(false); + } + + /** {@inheritDoc} */ + @Override protected IgniteConfiguration getConfiguration(String igniteInstanceName) throws Exception { + IgniteConfiguration config = super.getConfiguration(igniteInstanceName); + + config.getDataStorageConfiguration().setPageSize(PAGE_SIZE); + + return config; + } + + /** {@inheritDoc} */ + @Override protected void beforeTestsStarted() throws Exception { + super.beforeTestsStarted(); + + cleanPersistenceDir(); + createTestSnapshot(); + } + + /** {@inheritDoc} */ + @Before + @Override public void beforeTestSnapshot() { + locEvts.clear(); + } + + /** {@inheritDoc} */ + @After + @Override public void afterTestSnapshot() throws Exception { + if (G.allGrids().isEmpty()) + return; + + IgniteEx ig = grid(0); + for (String cacheName : ig.cacheNames()) { + IgniteCache cache = ig.cache(cacheName); + + cache.destroy(); + } + + stopAllGrids(); + } + + /** */ + @Test + public void testRestoreFullSnapshot() throws Exception { + IgniteEx ignite = startGrids(3); + ignite.events().localListen(e -> locEvts.add(e.type()), EVTS_CLUSTER_SNAPSHOT); + ignite.cluster().state(ClusterState.ACTIVE); + + for (String snpName: Arrays.asList(SNAPSHOT_WITH_HOLES, SNAPSHOT_WITHOUT_HOLES)) { + try { + ignite.snapshot().restoreSnapshot(snpName, null).get(TIMEOUT); + + waitForEvents(EVT_CLUSTER_SNAPSHOT_RESTORE_STARTED, EVT_CLUSTER_SNAPSHOT_RESTORE_FINISHED); + + for (String cacheName : CACHES.keySet()) { + IgniteCache cache = ignite.cache(cacheName); + + assertCacheKeys(cache, 1000); + + cache.destroy(); + } + } + finally { + locEvts.clear(); + } + } + } + + /** */ + @Test + public void testRestoreFail_OnGridWithoutCompression() throws Exception { + IgniteEx ignite = startGrids(3); + ignite.events().localListen(e -> locEvts.add(e.type()), EVTS_CLUSTER_SNAPSHOT); + ignite.cluster().state(ClusterState.ACTIVE); + + G.allGrids().forEach(this::failCompressionProcessor); + + for (String snpName: Arrays.asList(SNAPSHOT_WITH_HOLES, SNAPSHOT_WITHOUT_HOLES)) { + GridTestUtils.assertThrows(log, () -> ignite.snapshot().restoreSnapshot(snpName, null).get(TIMEOUT), + IgniteException.class, "Snapshot contains compressed cache groups"); + } + } + + + /** */ + @Test + public void testRestoreNotCompressed_OnGridWithoutCompression() throws Exception { + IgniteEx ignite = startGrids(3); + ignite.events().localListen(e -> locEvts.add(e.type()), EVTS_CLUSTER_SNAPSHOT); + ignite.cluster().state(ClusterState.ACTIVE); + + G.allGrids().forEach(i -> failCompressionProcessor(i)); + + Collection<String> groupsWithoutCompression = CACHES.entrySet().stream() + .filter(e -> !COMPRESSED_CACHES.contains(e.getKey())) + .map(e -> e.getValue() != null ? e.getValue() : e.getKey()) + .distinct().collect(Collectors.toList()); + + for (String snpName: Arrays.asList(SNAPSHOT_WITH_HOLES, SNAPSHOT_WITHOUT_HOLES)) { + try { + ignite.snapshot().restoreSnapshot(snpName, groupsWithoutCompression).get(TIMEOUT); + + waitForEvents(EVT_CLUSTER_SNAPSHOT_RESTORE_STARTED, EVT_CLUSTER_SNAPSHOT_RESTORE_FINISHED); + + CACHES.keySet().stream().filter(c -> !COMPRESSED_CACHES.contains(c)).forEach(cacheName -> { + IgniteCache cache = ignite.cache(cacheName); + + assertCacheKeys(cache, 1000); + + cache.destroy(); + }); + } + finally { + locEvts.clear(); + } + } + } + + /** {@inheritDoc} */ + @Override protected Function<Integer, Object> valueBuilder() { + return i -> new Value("name_" + i); + } + + /** */ + protected void createTestSnapshot() throws Exception { + CacheConfiguration[] caches = CACHES.entrySet().stream() + .map(cache -> { + CacheConfiguration config = new CacheConfiguration(cache.getKey()); + + config.setQueryEntities(Collections.singletonList( + new QueryEntity() + .setKeyType(Integer.class.getName()) + .setValueType(Value.class.getName()) + .addQueryField("id", Integer.class.getName(), null) + .addQueryField("name", String.class.getName(), null) + .setIndexes(F.asList(new QueryIndex("name"))) + )); + + if (cache.getValue() != null) + config.setGroupName(cache.getValue()); + + if (COMPRESSED_CACHES.contains(cache.getKey())) + config.setDiskPageCompression(DISK_PAGE_COMPRESSION); + else + config.setDiskPageCompression(DiskPageCompression.DISABLED); + + return config; + }).toArray(CacheConfiguration[]::new); + + IgniteEx ignite = startGridsWithCache(3, 1000, valueBuilder(), caches); + + forceCheckpoint(); + + G.allGrids().forEach(i -> failCompressionProcessor(i, SNAPSHOT_WITHOUT_HOLES)); + + for (String snpName : Arrays.asList(SNAPSHOT_WITH_HOLES, SNAPSHOT_WITHOUT_HOLES)) { + ignite.snapshot().createSnapshot(snpName).get(TIMEOUT); + + IdleVerifyResultV2 res = ignite.context().cache().context().snapshotMgr().checkSnapshot(snpName, null) + .get().idleVerifyResult(); + + StringBuilder b = new StringBuilder(); + res.print(b::append, true); + + assertTrue("Exceptions: " + b, F.isEmpty(res.exceptions())); + assertTrue(F.isEmpty(res.exceptions())); + } + + Path withHolesPath = Paths.get(U.resolveWorkDirectory(U.defaultWorkDirectory(), DFLT_SNAPSHOT_DIRECTORY, false) + .toString(), SNAPSHOT_WITH_HOLES); + + Path withoutHolesPath = Paths.get(U.resolveWorkDirectory(U.defaultWorkDirectory(), DFLT_SNAPSHOT_DIRECTORY, false) + .toString(), SNAPSHOT_WITHOUT_HOLES); + + long withHolesSize = directorySize(withHolesPath); + long withoutHolesSize = directorySize(withoutHolesPath); + + assertTrue("withHolesSize < withoutHolesSize: " + withHolesSize + " < " + withoutHolesSize, + withHolesSize < withoutHolesSize); + + long idxWithHolesSize = directorySize(withHolesPath, "index\\.bin"); + long idxWithoutHolesSize = directorySize(withoutHolesPath, "index\\.bin"); + + assertTrue("idxWithHolesSize < idxWithoutHolesSize: " + idxWithHolesSize + " < " + idxWithoutHolesSize, + idxWithHolesSize < idxWithoutHolesSize); + + ignite.cacheNames().forEach(c -> ignite.getOrCreateCache(c).destroy()); + + G.stopAll(true); + } + + /** */ + private void failCompressionProcessor(Ignite ignite, String... snpNames) { + CompressionProcessor compressProc = ((IgniteEx)ignite).context().compress(); + + CompressionProcessor spyCompressProc = Mockito.spy(compressProc); + + if (F.isEmpty(snpNames)) { + try { + Mockito.doAnswer(inv -> { + throw new IgniteCheckedException(new IgniteException("errno: -12")); + }).when(spyCompressProc).checkPageCompressionSupported(); + + Mockito.doAnswer(inv -> { + throw new IgniteCheckedException(new IgniteException("errno: -12")); + }).when(spyCompressProc).checkPageCompressionSupported(Mockito.any(), Mockito.anyInt()); + } + catch (IgniteCheckedException e) { + throw new IgniteException(e); + } + } + else { + for (String snpName : snpNames) { + try { + Mockito.doAnswer(inv -> { + if (snpName != null && ((Path)inv.getArgument(0)).endsWith(snpName)) + throw new IgniteCheckedException(new IgniteException("errno: -12")); + return null; + }).when(spyCompressProc).checkPageCompressionSupported(Mockito.any(), Mockito.anyInt()); + } + catch (IgniteCheckedException e) { + throw new IgniteException(e); + } + } + } + + ((GridKernalContextImpl)((IgniteEx)ignite).context()).add(spyCompressProc); + } + + /** */ + private static long directorySize(Path path) throws IOException { + return directorySize(path, null); + } + + /** */ + private static long directorySize(Path path, String pattern) throws IOException { + try (Stream<Path> walk = Files.walk(path)) { + return walk.filter(Files::isRegularFile) + .filter(f -> F.isEmpty(pattern) || f.getFileName().toString().matches(pattern)) + .mapToLong(p -> { + try (FileIO fio = new RandomAccessFileIO(p.toFile(), StandardOpenOption.READ)) { + return fio.getSparseSize(); + } + catch (IOException e) { + throw new RuntimeException(e); + } + }).sum(); + } + } + + /** */ + private static class Value { + /** */ + String name; + + /** */ + Value(String name) { + this.name = name; + } + + /** {@inheritDoc} */ + @Override public boolean equals(Object o) { + if (this == o) + return true; + + if (o == null || getClass() != o.getClass()) + return false; + + Value value = (Value)o; + + return Objects.equals(name, value.name); + } + + /** {@inheritDoc} */ + @Override public int hashCode() { + return Objects.hash(name); + } + } +} diff --git a/modules/compress/src/test/java/org/apache/ignite/internal/processors/compress/DiskPageCompressionIntegrationTest.java b/modules/compress/src/test/java/org/apache/ignite/internal/processors/compress/DiskPageCompressionIntegrationTest.java index d833a337a58..00a2ae2cd93 100644 --- a/modules/compress/src/test/java/org/apache/ignite/internal/processors/compress/DiskPageCompressionIntegrationTest.java +++ b/modules/compress/src/test/java/org/apache/ignite/internal/processors/compress/DiskPageCompressionIntegrationTest.java @@ -63,12 +63,14 @@ public class DiskPageCompressionIntegrationTest extends AbstractPageCompressionI /** {@inheritDoc} */ @Override protected IgniteConfiguration getConfiguration(String igniteName) throws Exception { DataRegionConfiguration drCfg = new DataRegionConfiguration() + .setMetricsEnabled(true) .setPersistenceEnabled(true); factory = getFileIOFactory(); DataStorageConfiguration dsCfg = new DataStorageConfiguration() .setPageSize(MAX_PAGE_SIZE) + .setMetricsEnabled(true) .setDefaultDataRegionConfiguration(drCfg) .setFileIOFactory(U.isLinux() ? factory : new PunchFileIOFactory(factory)); diff --git a/modules/compress/src/test/java/org/apache/ignite/testsuites/IgnitePdsCompressionTestSuite.java b/modules/compress/src/test/java/org/apache/ignite/testsuites/IgnitePdsCompressionTestSuite.java index a13427c4624..aae13a64ac0 100644 --- a/modules/compress/src/test/java/org/apache/ignite/testsuites/IgnitePdsCompressionTestSuite.java +++ b/modules/compress/src/test/java/org/apache/ignite/testsuites/IgnitePdsCompressionTestSuite.java @@ -18,11 +18,15 @@ package org.apache.ignite.testsuites; import java.util.ArrayList; +import java.util.Arrays; import java.util.List; import org.apache.ignite.internal.processors.cache.persistence.db.wal.IgnitePdsCheckpointSimulationWithRealCpDisabledAndWalCompressionTest; import org.apache.ignite.internal.processors.cache.persistence.db.wal.WalCompactionAndPageCompressionTest; import org.apache.ignite.internal.processors.cache.persistence.db.wal.WalRecoveryWithPageCompressionAndTdeTest; import org.apache.ignite.internal.processors.cache.persistence.db.wal.WalRecoveryWithPageCompressionTest; +import org.apache.ignite.internal.processors.cache.persistence.snapshot.EncryptedSnapshotTest; +import org.apache.ignite.internal.processors.cache.persistence.snapshot.PlainSnapshotTest; +import org.apache.ignite.internal.processors.cache.persistence.snapshot.SnapshotCompressionBasicTest; import org.apache.ignite.internal.processors.compress.CompressionConfigurationTest; import org.apache.ignite.internal.processors.compress.CompressionProcessorTest; import org.apache.ignite.internal.processors.compress.DiskPageCompressionConfigValidationTest; @@ -60,7 +64,15 @@ public class IgnitePdsCompressionTestSuite { suite.add(IgnitePdsCheckpointSimulationWithRealCpDisabledAndWalCompressionTest.class); suite.add(WalCompactionAndPageCompressionTest.class); + // Snapshots. + suite.add(SnapshotCompressionBasicTest.class); + + //Snapshot tests from common suites. enableCompressionByDefault(); + IgniteSnapshotTestSuite.addSnapshotTests(suite, Arrays.asList(PlainSnapshotTest.class, EncryptedSnapshotTest.class)); + IgniteSnapshotWithIndexingTestSuite.addSnapshotTests(suite, null); + + // PDS test suite with compression. IgnitePdsTestSuite.addRealPageStoreTests(suite, null); return suite; diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/CacheCompressionManager.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/CacheCompressionManager.java deleted file mode 100644 index 3bf2b8bbb48..00000000000 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/CacheCompressionManager.java +++ /dev/null @@ -1,102 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.ignite.internal.processors.cache; - -import java.io.File; -import java.nio.ByteBuffer; -import org.apache.ignite.IgniteCheckedException; -import org.apache.ignite.configuration.CacheConfiguration; -import org.apache.ignite.configuration.DataStorageConfiguration; -import org.apache.ignite.configuration.DiskPageCompression; -import org.apache.ignite.internal.pagemem.store.PageStore; -import org.apache.ignite.internal.processors.compress.CompressionProcessor; -import org.apache.ignite.internal.util.typedef.internal.CU; -import org.apache.ignite.internal.util.typedef.internal.U; - -import static org.apache.ignite.internal.processors.compress.CompressionProcessor.checkCompressionLevelBounds; -import static org.apache.ignite.internal.processors.compress.CompressionProcessor.getDefaultCompressionLevel; - -/** - * Cache compression manager. - */ -public class CacheCompressionManager extends GridCacheManagerAdapter { - /** */ - private DiskPageCompression diskPageCompression; - - /** */ - private int diskPageCompressLevel; - - /** */ - private CompressionProcessor compressProc; - - /** {@inheritDoc} */ - @Override protected void start0() throws IgniteCheckedException { - CacheConfiguration cfg = cctx.config(); - - if (cctx.kernalContext().clientNode() || !CU.isPersistentCache(cfg, cctx.gridConfig().getDataStorageConfiguration())) { - diskPageCompression = DiskPageCompression.DISABLED; - - return; - } - - compressProc = cctx.kernalContext().compress(); - - diskPageCompression = cctx.kernalContext().config().isClientMode() ? null : cfg.getDiskPageCompression(); - - if (diskPageCompression != DiskPageCompression.DISABLED) { - if (!cctx.dataRegion().config().isPersistenceEnabled()) - throw new IgniteCheckedException("Disk page compression makes sense only with enabled persistence."); - - Integer lvl = cfg.getDiskPageCompressionLevel(); - diskPageCompressLevel = lvl != null ? - checkCompressionLevelBounds(lvl, diskPageCompression) : - getDefaultCompressionLevel(diskPageCompression); - - DataStorageConfiguration dsCfg = cctx.kernalContext().config().getDataStorageConfiguration(); - - File dbPath = cctx.kernalContext().pdsFolderResolver().resolveFolders().persistentStoreRootPath(); - - assert dbPath != null; - - compressProc.checkPageCompressionSupported(dbPath.toPath(), dsCfg.getPageSize()); - - if (log.isInfoEnabled()) { - log.info("Disk page compression is enabled [cache=" + cctx.name() + - ", compression=" + diskPageCompression + ", level=" + diskPageCompressLevel + "]"); - } - } - } - - /** - * @param page Page buffer. - * @param store Page store. - * @return Compressed or the same buffer. - * @throws IgniteCheckedException If failed. - */ - public ByteBuffer compressPage(ByteBuffer page, PageStore store) throws IgniteCheckedException { - if (diskPageCompression == DiskPageCompression.DISABLED) - return page; - - int blockSize = store.getBlockSize(); - - if (blockSize <= 0) - throw new IgniteCheckedException("Failed to detect storage block size on " + U.osString()); - - return compressProc.compressPage(page, store.getPageSize(), blockSize, diskPageCompression, diskPageCompressLevel); - } -} diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/CacheGroupContext.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/CacheGroupContext.java index e26a8b93ef4..28dc32fe20f 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/CacheGroupContext.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/CacheGroupContext.java @@ -56,6 +56,7 @@ import org.apache.ignite.internal.processors.cache.persistence.GridCacheOffheapM import org.apache.ignite.internal.processors.cache.persistence.freelist.FreeList; import org.apache.ignite.internal.processors.cache.persistence.tree.reuse.ReuseList; import org.apache.ignite.internal.processors.cache.query.continuous.CounterSkipContext; +import org.apache.ignite.internal.processors.compress.CompressionHandler; import org.apache.ignite.internal.processors.metric.GridMetricManager; import org.apache.ignite.internal.processors.plugin.IgnitePluginProcessor; import org.apache.ignite.internal.processors.query.QueryUtils; @@ -71,6 +72,7 @@ import org.apache.ignite.lang.IgniteFuture; import org.apache.ignite.lang.IgnitePredicate; import org.apache.ignite.plugin.CacheTopologyValidatorProvider; import org.jetbrains.annotations.Nullable; + import static org.apache.ignite.cache.CacheAtomicityMode.ATOMIC; import static org.apache.ignite.cache.CacheAtomicityMode.TRANSACTIONAL_SNAPSHOT; import static org.apache.ignite.cache.CacheMode.REPLICATED; @@ -192,6 +194,9 @@ public class CacheGroupContext { /** Topology validators. */ private final Collection<TopologyValidator> topValidators; + /** Disk page compression method. */ + private final CompressionHandler compressHandler; + /** * @param ctx Context. * @param grpId Group ID. @@ -206,6 +211,7 @@ public class CacheGroupContext { * @param locStartVer Topology version when group was started on local node. * @param persistenceEnabled Persistence enabled flag. * @param walEnabled Wal enabled flag. + * @param compressHandler Compresion handler. */ public CacheGroupContext( GridCacheSharedContext ctx, @@ -221,7 +227,8 @@ public class CacheGroupContext { AffinityTopologyVersion locStartVer, boolean persistenceEnabled, boolean walEnabled, - boolean recoveryMode + boolean recoveryMode, + CompressionHandler compressHandler ) { assert ccfg != null; assert dataRegion != null || !affNode; @@ -242,6 +249,7 @@ public class CacheGroupContext { this.persistenceEnabled = persistenceEnabled; this.localWalEnabled = true; this.recoveryMode = new AtomicBoolean(recoveryMode); + this.compressHandler = compressHandler; ioPlc = cacheType.ioPolicy(); @@ -1329,6 +1337,11 @@ public class CacheGroupContext { return ctx.wal(cdcEnabled()); } + /** */ + public CompressionHandler compressionHandler() { + return compressHandler; + } + /** * @param ccfg Cache configuration. * @param plugins Ignite plugin processor. diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheContext.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheContext.java index 89f15fe57ff..fb48c74a141 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheContext.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheContext.java @@ -186,9 +186,6 @@ public class GridCacheContext<K, V> implements Externalizable { /** Store manager. */ private CacheStoreManager storeMgr; - /** Compression manager. */ - private CacheCompressionManager compressMgr; - /** Replication manager. */ private GridCacheDrManager drMgr; @@ -339,8 +336,6 @@ public class GridCacheContext<K, V> implements Externalizable { * Managers in starting order! * =========================== */ - - CacheCompressionManager compressMgr, GridCacheEventManager evtMgr, CacheStoreManager storeMgr, CacheEvictionManager evictMgr, @@ -359,7 +354,6 @@ public class GridCacheContext<K, V> implements Externalizable { assert cacheCfg != null; assert locStartTopVer != null : cacheCfg.getName(); - assert compressMgr != null; assert grp != null; assert evtMgr != null; assert storeMgr != null; @@ -386,7 +380,6 @@ public class GridCacheContext<K, V> implements Externalizable { * Managers in starting order! * =========================== */ - this.compressMgr = add(compressMgr); this.evtMgr = add(evtMgr); this.storeMgr = add(storeMgr); this.evictMgr = add(evictMgr); @@ -1236,13 +1229,6 @@ public class GridCacheContext<K, V> implements Externalizable { return EMPTY_VERSION; } - /** - * @return Compression manager. - */ - public CacheCompressionManager compress() { - return compressMgr; - } - /** * Sets cache object context. * diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheProcessor.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheProcessor.java index c5a56bceccf..0ff4fda1727 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheProcessor.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheProcessor.java @@ -141,6 +141,7 @@ import org.apache.ignite.internal.processors.cacheobject.IgniteCacheObjectProces import org.apache.ignite.internal.processors.cluster.ChangeGlobalStateFinishMessage; import org.apache.ignite.internal.processors.cluster.ChangeGlobalStateMessage; import org.apache.ignite.internal.processors.cluster.DiscoveryDataClusterState; +import org.apache.ignite.internal.processors.compress.CompressionHandler; import org.apache.ignite.internal.processors.datastructures.DataStructuresProcessor; import org.apache.ignite.internal.processors.metric.MetricRegistry; import org.apache.ignite.internal.processors.platform.cache.PlatformCacheManager; @@ -194,6 +195,7 @@ import org.apache.ignite.spi.systemview.view.CachePagesListView; import org.apache.ignite.spi.systemview.view.PartitionStateView; import org.jetbrains.annotations.NotNull; import org.jetbrains.annotations.Nullable; + import static java.lang.String.format; import static java.util.Arrays.asList; import static java.util.Objects.isNull; @@ -1269,7 +1271,6 @@ public class GridCacheProcessor extends GridProcessorAdapter { boolean nearEnabled = GridCacheUtils.isNearEnabled(cfg); - CacheCompressionManager compressMgr = new CacheCompressionManager(); GridCacheAffinityManager affMgr = new GridCacheAffinityManager(); GridCacheEventManager evtMgr = new GridCacheEventManager(); CacheEvictionManager evictMgr = (nearEnabled || cfg.isOnheapCacheEnabled()) @@ -1309,7 +1310,6 @@ public class GridCacheProcessor extends GridProcessorAdapter { * Managers in starting order! * =========================== */ - compressMgr, evtMgr, storeMgr, evictMgr, @@ -1427,7 +1427,6 @@ public class GridCacheProcessor extends GridProcessorAdapter { * Managers in starting order! * =========================== */ - compressMgr, evtMgr, storeMgr, evictMgr, @@ -2480,6 +2479,14 @@ public class GridCacheProcessor extends GridProcessorAdapter { boolean persistenceEnabled = recoveryMode || sharedCtx.localNode().isClient() ? desc.persistenceEnabled() : dataRegion != null && dataRegion.config().isPersistenceEnabled(); + CompressionHandler compressHandler = CompressionHandler.create(ctx, cfg); + + if (log.isInfoEnabled() && compressHandler.compressionEnabled()) { + log.info("Disk page compression is enabled [cacheGrp=" + CU.cacheOrGroupName(cfg) + + ", compression=" + compressHandler.diskPageCompression() + ", level=" + + compressHandler.diskPageCompressionLevel() + "]"); + } + CacheGroupContext grp = new CacheGroupContext(sharedCtx, desc.groupId(), desc.receivedFrom(), @@ -2493,7 +2500,8 @@ public class GridCacheProcessor extends GridProcessorAdapter { exchTopVer, persistenceEnabled, desc.walEnabled(), - recoveryMode + recoveryMode, + compressHandler ); for (Object obj : grp.configuredUserObjects()) diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/defragmentation/CachePartitionDefragmentationManager.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/defragmentation/CachePartitionDefragmentationManager.java index 6d5185ac76d..70de48d2aaf 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/defragmentation/CachePartitionDefragmentationManager.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/defragmentation/CachePartitionDefragmentationManager.java @@ -362,7 +362,8 @@ public class CachePartitionDefragmentationManager { oldGrpCtx.localStartVersion(), true, false, - true + true, + oldGrpCtx.compressionHandler() ); defragmentationCheckpoint.checkpointTimeoutLock().checkpointReadLock(); diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/pagemem/PageReadWriteManagerImpl.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/pagemem/PageReadWriteManagerImpl.java index 63ff696e617..4e19c286b74 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/pagemem/PageReadWriteManagerImpl.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/pagemem/PageReadWriteManagerImpl.java @@ -25,7 +25,7 @@ import org.apache.ignite.internal.GridKernalContext; import org.apache.ignite.internal.pagemem.PageIdUtils; import org.apache.ignite.internal.pagemem.store.PageStore; import org.apache.ignite.internal.pagemem.store.PageStoreCollection; -import org.apache.ignite.internal.processors.cache.GridCacheContext; +import org.apache.ignite.internal.processors.cache.CacheGroupContext; import org.apache.ignite.internal.processors.cache.persistence.StorageException; import org.apache.ignite.internal.processors.cache.persistence.tree.io.PageIO; import org.apache.ignite.internal.util.tostring.GridToStringExclude; @@ -93,12 +93,11 @@ public class PageReadWriteManagerImpl implements PageReadWriteManager { int pageSize = store.getPageSize(); int compressedPageSize = pageSize; - GridCacheContext<?, ?> cctx0 = ctx.cache().context().cacheContext(grpId); - - if (cctx0 != null) { + CacheGroupContext grpCtx = ctx.cache().cacheGroup(grpId); + if (grpCtx != null) { assert pageBuf.position() == 0 && pageBuf.limit() == pageSize : pageBuf; - ByteBuffer compressedPageBuf = cctx0.compress().compressPage(pageBuf, store); + ByteBuffer compressedPageBuf = grpCtx.compressionHandler().compressPage(pageBuf, store); if (compressedPageBuf != pageBuf) { compressedPageSize = PageIO.getCompressedSize(compressedPageBuf); diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/snapshot/IgniteSnapshotManager.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/snapshot/IgniteSnapshotManager.java index 26270b900bd..4bf57ead781 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/snapshot/IgniteSnapshotManager.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/snapshot/IgniteSnapshotManager.java @@ -81,6 +81,7 @@ import org.apache.ignite.binary.BinaryType; import org.apache.ignite.cluster.ClusterNode; import org.apache.ignite.compute.ComputeTask; import org.apache.ignite.configuration.CacheConfiguration; +import org.apache.ignite.configuration.DiskPageCompression; import org.apache.ignite.configuration.IgniteConfiguration; import org.apache.ignite.events.DiscoveryEvent; import org.apache.ignite.events.SnapshotEvent; @@ -142,6 +143,7 @@ import org.apache.ignite.internal.processors.cache.tree.DataRow; import org.apache.ignite.internal.processors.cache.verify.IdleVerifyResultV2; import org.apache.ignite.internal.processors.cluster.DiscoveryDataClusterState; import org.apache.ignite.internal.processors.cluster.IgniteChangeGlobalStateSupport; +import org.apache.ignite.internal.processors.compress.CompressionProcessor; import org.apache.ignite.internal.processors.configuration.distributed.DistributedConfigurationLifecycleListener; import org.apache.ignite.internal.processors.configuration.distributed.DistributedLongProperty; import org.apache.ignite.internal.processors.configuration.distributed.DistributedPropertyDispatcher; @@ -795,6 +797,10 @@ public class IgniteSnapshotManager extends GridCacheSharedManagerAdapter } List<Integer> grpIds = new ArrayList<>(F.viewReadOnly(req.groups(), CU::cacheId)); + Collection<Integer> comprGrpIds = F.view(grpIds, i -> { + CacheGroupDescriptor desc = cctx.cache().cacheGroupDescriptor(i); + return desc != null && desc.config().getDiskPageCompression() != DiskPageCompression.DISABLED; + }); Set<Integer> leftGrps = new HashSet<>(grpIds); leftGrps.removeAll(cctx.cache().cacheGroupDescriptors().keySet()); @@ -853,6 +859,7 @@ public class IgniteSnapshotManager extends GridCacheSharedManagerAdapter pdsSettings.folderName(), cctx.gridConfig().getDataStorageConfiguration().getPageSize(), grpIds, + comprGrpIds, blts, (Set<GroupPartitionId>)fut.result(), cctx.gridConfig().getEncryptionSpi().masterKeyDigest() @@ -1458,6 +1465,27 @@ public class IgniteSnapshotManager extends GridCacheSharedManagerAdapter return; } + if (meta.hasCompressedGroups() && grpIds.keySet().stream().anyMatch(meta::isGroupWithCompresion)) { + try { + kctx0.compress().checkPageCompressionSupported(); + } + catch (IgniteCheckedException e) { + String grpWithCompr = grpIds.entrySet().stream() + .filter(grp -> meta.isGroupWithCompresion(grp.getKey())) + .map(Map.Entry::getValue).collect(Collectors.joining(", ")); + + String msg = "Requested cache groups [" + grpWithCompr + "] for check " + + "from snapshot '" + meta.snapshotName() + "' are compressed while " + + "disk page compression is disabled. To check these groups please " + + "start Ignite with ignite-compress module in classpath"; + + res.onDone(new SnapshotPartitionsVerifyTaskResult(metas, new IdleVerifyResultV2( + Collections.singletonMap(cctx.localNode(), new IllegalArgumentException(msg))))); + + return; + } + } + grpIds.keySet().removeAll(meta.partitions().keySet()); } } @@ -1914,9 +1942,12 @@ public class IgniteSnapshotManager extends GridCacheSharedManagerAdapter * @return Standalone kernal context related to the snapshot. * @throws IgniteCheckedException If fails. */ - public StandaloneGridKernalContext createStandaloneKernalContext(File snpDir, String folderName) throws IgniteCheckedException { - return new StandaloneGridKernalContext(log, - resolveBinaryWorkDir(snpDir.getAbsolutePath(), folderName), + public StandaloneGridKernalContext createStandaloneKernalContext( + CompressionProcessor cmpProc, + File snpDir, + String folderName + ) throws IgniteCheckedException { + return new StandaloneGridKernalContext(log, cmpProc, resolveBinaryWorkDir(snpDir.getAbsolutePath(), folderName), resolveMappingFileStoreWorkDir(snpDir.getAbsolutePath())); } @@ -2495,6 +2526,9 @@ public class IgniteSnapshotManager extends GridCacheSharedManagerAdapter /** Batch of rows read through iteration. */ private final Deque<CacheDataRow> rows = new LinkedList<>(); + /** */ + private final CompressionProcessor compressProc; + /** {@code true} if the iteration though partition reached its end. */ private boolean secondScanComplete; @@ -2524,6 +2558,7 @@ public class IgniteSnapshotManager extends GridCacheSharedManagerAdapter this.partId = partId; this.coctx = coctx; this.sctx = sctx; + compressProc = sctx.kernalContext().compress(); store.ensure(); pages = store.pages(); @@ -2670,6 +2705,9 @@ public class IgniteSnapshotManager extends GridCacheSharedManagerAdapter assert read : toDetailString(pageId); + if (PageIO.getCompressionType(buff) != CompressionProcessor.UNCOMPRESSED_PAGE) + compressProc.decompressPage(buff, store.getPageSize()); + return getType(buff) == flag(pageId); } @@ -3399,8 +3437,9 @@ public class IgniteSnapshotManager extends GridCacheSharedManagerAdapter transferRateLimiter.acquire(pageSize); ByteBuffer page = deltaIter.next(); + long pageId = PageIO.getPageId(page); - pageStore.write(PageIO.getPageId(page), page, 0, false); + pageStore.write(pageId, page, 0, false); } pageStore.finishRecover(); diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/snapshot/SnapshotFutureTask.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/snapshot/SnapshotFutureTask.java index 797fcba2d18..7f2477d3331 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/snapshot/SnapshotFutureTask.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/snapshot/SnapshotFutureTask.java @@ -68,8 +68,10 @@ import org.apache.ignite.internal.processors.cache.persistence.metastorage.MetaS import org.apache.ignite.internal.processors.cache.persistence.partstate.GroupPartitionId; import org.apache.ignite.internal.processors.cache.persistence.tree.io.PageIO; import org.apache.ignite.internal.processors.cache.persistence.wal.crc.FastCrc; +import org.apache.ignite.internal.processors.compress.CompressionProcessor; import org.apache.ignite.internal.processors.marshaller.MappedName; import org.apache.ignite.internal.processors.metastorage.persistence.DistributedMetaStorageImpl; +import org.apache.ignite.internal.util.GridUnsafe; import org.apache.ignite.internal.util.future.GridFutureAdapter; import org.apache.ignite.internal.util.lang.IgniteThrowableRunner; import org.apache.ignite.internal.util.tostring.GridToStringExclude; @@ -943,12 +945,27 @@ class SnapshotFutureTask extends AbstractSnapshotFutureTask<Set<GroupPartitionId if (!store.read(pageId, locBuf, true)) return; - locBuf.flip(); + locBuf.clear(); writePage0(pageId, locBuf); } else { // Direct buffer is needs to be written, associated checkpoint not finished yet. + if (PageIO.getCompressionType(GridUnsafe.bufferAddress(buf)) != CompressionProcessor.UNCOMPRESSED_PAGE) { + final ByteBuffer locBuf = locBuff.get(); + + assert locBuf.capacity() == store.getPageSize(); + + locBuf.clear(); + + GridUnsafe.copyOffheapOffheap(GridUnsafe.bufferAddress(buf), GridUnsafe.bufferAddress(locBuf), buf.limit()); + + locBuf.limit(locBuf.capacity()); + locBuf.position(0); + + buf = locBuf; + } + writePage0(pageId, buf); // Page marked as written to delta file, so there is no need to @@ -990,6 +1007,8 @@ class SnapshotFutureTask extends AbstractSnapshotFutureTask<Set<GroupPartitionId // Write buffer to the end of the file. int len = deltaFileIo.writeFully(pageBuf); + assert len == pageBuf.capacity(); + totalSize.addAndGet(len); } diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/snapshot/SnapshotMetadata.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/snapshot/SnapshotMetadata.java index 0a74d50cdb2..fd68e9a3219 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/snapshot/SnapshotMetadata.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/snapshot/SnapshotMetadata.java @@ -21,6 +21,7 @@ import java.io.IOException; import java.io.InvalidObjectException; import java.io.Serializable; import java.util.Arrays; +import java.util.Collection; import java.util.Collections; import java.util.HashMap; import java.util.HashSet; @@ -31,6 +32,7 @@ import java.util.Set; import java.util.UUID; import org.apache.ignite.internal.processors.cache.persistence.partstate.GroupPartitionId; import org.apache.ignite.internal.util.tostring.GridToStringInclude; +import org.apache.ignite.internal.util.typedef.F; import org.apache.ignite.internal.util.typedef.internal.S; import org.apache.ignite.internal.util.typedef.internal.U; import org.jetbrains.annotations.Nullable; @@ -83,6 +85,12 @@ public class SnapshotMetadata implements Serializable { @GridToStringInclude @Nullable private List<String> warnings; + /** */ + private transient Set<Integer> comprGrpIds; + + /** */ + private boolean hasComprGrps; + /** * F@param snpName Snapshot name. * @param consId Consistent id of a node to which this metadata relates. @@ -99,6 +107,7 @@ public class SnapshotMetadata implements Serializable { String folderName, int pageSize, List<Integer> grpIds, + Collection<Integer> compGrpIds, Set<String> bltNodes, Set<GroupPartitionId> pairs, @Nullable byte[] masterKeyDigest @@ -112,6 +121,12 @@ public class SnapshotMetadata implements Serializable { this.bltNodes = bltNodes; this.masterKeyDigest = masterKeyDigest; + if (!F.isEmpty(compGrpIds)) { + hasComprGrps = true; + + comprGrpIds = new HashSet<>(compGrpIds); + } + pairs.forEach(p -> locParts.computeIfAbsent(p.getGroupId(), k -> new HashSet<>()) .add(p.getPartitionId())); @@ -174,6 +189,16 @@ public class SnapshotMetadata implements Serializable { return Collections.unmodifiableMap(locParts); } + /** */ + public boolean isGroupWithCompresion(int grpId) { + return hasComprGrps && comprGrpIds.contains(grpId); + } + + /** */ + public boolean hasCompressedGroups() { + return hasComprGrps; + } + /** Save the state of this <tt>HashMap</tt> partitions and cache groups to a stream. */ private void writeObject(java.io.ObjectOutputStream s) throws java.io.IOException { @@ -191,6 +216,9 @@ public class SnapshotMetadata implements Serializable { for (Integer partId : e.getValue()) s.writeInt(partId); } + + if (hasComprGrps) + U.writeCollection(s, comprGrpIds); } /** Reconstitute the <tt>HashMap</tt> instance of partitions and cache groups from a stream. */ @@ -221,6 +249,9 @@ public class SnapshotMetadata implements Serializable { locParts.put(grpId, parts); } + + if (hasComprGrps) + comprGrpIds = U.readSet(s); } /** @@ -275,7 +306,9 @@ public class SnapshotMetadata implements Serializable { Objects.equals(grpIds, meta.grpIds) && Objects.equals(bltNodes, meta.bltNodes) && Arrays.equals(masterKeyDigest, meta.masterKeyDigest) && - Objects.equals(warnings, meta.warnings); + Objects.equals(warnings, meta.warnings) && + Objects.equals(hasComprGrps, hasComprGrps) && + Objects.equals(comprGrpIds, comprGrpIds); } /** {@inheritDoc} */ diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/snapshot/SnapshotPartitionsVerifyHandler.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/snapshot/SnapshotPartitionsVerifyHandler.java index 1a0ea4f1e68..90a7b71fe4e 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/snapshot/SnapshotPartitionsVerifyHandler.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/snapshot/SnapshotPartitionsVerifyHandler.java @@ -54,6 +54,7 @@ import org.apache.ignite.internal.processors.cache.persistence.tree.io.PageParti import org.apache.ignite.internal.processors.cache.verify.IdleVerifyResultV2; import org.apache.ignite.internal.processors.cache.verify.PartitionHashRecordV2; import org.apache.ignite.internal.processors.cache.verify.PartitionKeyV2; +import org.apache.ignite.internal.processors.compress.CompressionProcessor; import org.apache.ignite.internal.util.GridStringBuilder; import org.apache.ignite.internal.util.GridUnsafe; import org.apache.ignite.internal.util.typedef.F; @@ -145,13 +146,16 @@ public class SnapshotPartitionsVerifyHandler implements SnapshotHandler<Map<Part ", meta=" + meta + ']'); } + boolean punchHoleEnabled = isPunchHoleEnabled(opCtx, grpDirs.keySet()); + Map<PartitionKeyV2, PartitionHashRecordV2> res = new ConcurrentHashMap<>(); ThreadLocal<ByteBuffer> buff = ThreadLocal.withInitial(() -> ByteBuffer.allocateDirect(meta.pageSize()) .order(ByteOrder.nativeOrder())); IgniteSnapshotManager snpMgr = cctx.snapshotMgr(); - GridKernalContext snpCtx = snpMgr.createStandaloneKernalContext(opCtx.snapshotDirectory(), meta.folderName()); + GridKernalContext snpCtx = snpMgr.createStandaloneKernalContext(cctx.kernalContext().compress(), + opCtx.snapshotDirectory(), meta.folderName()); FilePageStoreManager storeMgr = (FilePageStoreManager)cctx.pageStore(); @@ -173,14 +177,38 @@ public class SnapshotPartitionsVerifyHandler implements SnapshotHandler<Map<Part (FilePageStore)storeMgr.getPageStoreFactory(grpId, snpEncrKeyProvider.getActiveKey(grpId) != null ? snpEncrKeyProvider : null).createPageStore(getTypeByPartId(partId), part::toPath, val -> {}) ) { + pageStore.init(); + + if (punchHoleEnabled && meta.isGroupWithCompresion(grpId) && type() == SnapshotHandlerType.CREATE) { + byte pageType = partId == INDEX_PARTITION ? FLAG_IDX : FLAG_DATA; + + checkPartitionsPageCrcSum(() -> pageStore, partId, pageType, (id, buffer) -> { + if (PageIO.getCompressionType(buffer) == CompressionProcessor.UNCOMPRESSED_PAGE) + return; + + int comprPageSz = PageIO.getCompressedSize(buffer); + + if (comprPageSz < pageStore.getPageSize()) { + try { + pageStore.punchHole(id, comprPageSz); + } + catch (Exception ignored) { + // No-op. + } + } + }); + } + if (partId == INDEX_PARTITION) { - checkPartitionsPageCrcSum(() -> pageStore, INDEX_PARTITION, FLAG_IDX); + if (!skipHash()) + checkPartitionsPageCrcSum(() -> pageStore, INDEX_PARTITION, FLAG_IDX); return null; } if (grpId == MetaStorage.METASTORAGE_CACHE_ID) { - checkPartitionsPageCrcSum(() -> pageStore, partId, FLAG_DATA); + if (!skipHash()) + checkPartitionsPageCrcSum(() -> pageStore, partId, FLAG_DATA); return null; } @@ -191,6 +219,9 @@ public class SnapshotPartitionsVerifyHandler implements SnapshotHandler<Map<Part long pageAddr = GridUnsafe.bufferAddress(pageBuff); + if (PageIO.getCompressionType(pageBuff) != CompressionProcessor.UNCOMPRESSED_PAGE) + snpCtx.compress().decompressPage(pageBuff, pageStore.getPageSize()); + PagePartitionMetaIO io = PageIO.getPageIO(pageBuff); GridDhtPartitionState partState = fromOrdinal(io.getPartitionState(pageAddr)); @@ -285,6 +316,36 @@ public class SnapshotPartitionsVerifyHandler implements SnapshotHandler<Map<Part return false; } + /** */ + protected boolean isPunchHoleEnabled(SnapshotHandlerContext opCtx, Set<Integer> grpIds) { + SnapshotMetadata meta = opCtx.metadata(); + Path snapshotDirectory = opCtx.snapshotDirectory().toPath(); + + if (meta.hasCompressedGroups() && grpIds.stream().anyMatch(meta::isGroupWithCompresion)) { + try { + cctx.kernalContext().compress().checkPageCompressionSupported(); + } + catch (IgniteCheckedException e) { + throw new IgniteException("Snapshot contains compressed cache groups " + + "[grps=[" + grpIds.stream().filter(meta::isGroupWithCompresion).collect(Collectors.toList()) + + "], snpName=" + meta.snapshotName() + "], but compression module is not enabled. " + + "Make sure that ignite-compress module is in classpath."); + } + + try { + cctx.kernalContext().compress().checkPageCompressionSupported(snapshotDirectory, meta.pageSize()); + + return true; + } + catch (Exception e) { + log.info("File system doesn't support page compression on snapshot directory: " + snapshotDirectory + + ", snapshot may have larger size than expected."); + } + } + + return false; + } + /** * Provides encryption keys stored within snapshot. * <p> diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/snapshot/SnapshotRestoreProcess.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/snapshot/SnapshotRestoreProcess.java index e53acf4a8ae..b414632bdd4 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/snapshot/SnapshotRestoreProcess.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/snapshot/SnapshotRestoreProcess.java @@ -680,6 +680,7 @@ public class SnapshotRestoreProcess { // Collect the cache configurations and prepare a temporary directory for copying files. // Metastorage can be restored only manually by directly copying files. + boolean skipCompressCheck = false; for (SnapshotMetadata meta : metas) { for (File snpCacheDir : cctx.snapshotMgr().snapshotCacheDirectories(req.snapshotName(), req.snapshotPath(), meta.folderName(), name -> !METASTORAGE_CACHE_NAME.equals(name))) { @@ -688,6 +689,28 @@ public class SnapshotRestoreProcess { if (!F.isEmpty(req.groups()) && !req.groups().contains(grpName)) continue; + if (!skipCompressCheck && meta.isGroupWithCompresion(CU.cacheId(grpName))) { + try { + File path = ctx.pdsFolderResolver().resolveFolders().persistentStoreRootPath(); + + ctx.compress().checkPageCompressionSupported(path.toPath(), meta.pageSize()); + } + catch (Exception e) { + String grpWithCompr = req.groups().stream().filter(s -> meta.isGroupWithCompresion(CU.cacheId(grpName))) + .collect(Collectors.joining(", ")); + + String msg = "Requested cache groups [" + grpWithCompr + "] for restore " + + "from snapshot '" + meta.snapshotName() + "' are compressed while " + + "disk page compression is disabled. To restore these groups please " + + "start Ignite with configured disk page compression"; + + throw new IgniteCheckedException(msg); + } + finally { + skipCompressCheck = true; + } + } + File cacheDir = pageStore.cacheWorkDir(snpCacheDir.getName().startsWith(CACHE_GRP_DIR_PREFIX), grpName); if (cacheDir.exists()) { diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/wal/reader/StandaloneGridKernalContext.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/wal/reader/StandaloneGridKernalContext.java index ac416515f46..890c1198c38 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/wal/reader/StandaloneGridKernalContext.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/wal/reader/StandaloneGridKernalContext.java @@ -144,8 +144,29 @@ public class StandaloneGridKernalContext implements GridKernalContext { /** Marshaller context implementation. */ private MarshallerContextImpl marshallerCtx; + /** */ + @Nullable private CompressionProcessor compressProc; + + /** + * @param log Logger. + * @param binaryMetadataFileStoreDir folder specifying location of metadata File Store. + * {@code null} means no specific folder is configured. <br> + * + * @param marshallerMappingFileStoreDir folder specifying location of marshaller mapping file store. + * {@code null} means no specific folder is configured. + * Providing {@code null} will disable unmarshall for non primitive objects, BinaryObjects will be provided <br> + */ + public StandaloneGridKernalContext( + IgniteLogger log, + @Nullable File binaryMetadataFileStoreDir, + @Nullable File marshallerMappingFileStoreDir + ) throws IgniteCheckedException { + this(log, null, binaryMetadataFileStoreDir, marshallerMappingFileStoreDir); + } + /** * @param log Logger. + * @param compressProc Compression processor. * @param binaryMetadataFileStoreDir folder specifying location of metadata File Store. * {@code null} means no specific folder is configured. <br> * @@ -155,6 +176,7 @@ public class StandaloneGridKernalContext implements GridKernalContext { */ public StandaloneGridKernalContext( IgniteLogger log, + @Nullable CompressionProcessor compressProc, @Nullable File binaryMetadataFileStoreDir, @Nullable File marshallerMappingFileStoreDir ) throws IgniteCheckedException { @@ -187,6 +209,8 @@ public class StandaloneGridKernalContext implements GridKernalContext { marshallerCtx.setMarshallerMappingFileStoreDir(marshallerMappingFileStoreDir); marshallerCtx.onMarshallerProcessorStarted(this, null); } + + this.compressProc = compressProc; } /** @@ -643,7 +667,7 @@ public class StandaloneGridKernalContext implements GridKernalContext { /** {@inheritDoc} */ @Override public CompressionProcessor compress() { - return null; + return compressProc; } /** {@inheritDoc} */ diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/verify/IdleVerifyUtility.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/verify/IdleVerifyUtility.java index 8d4b8ed755e..e288679e69a 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/verify/IdleVerifyUtility.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/verify/IdleVerifyUtility.java @@ -27,6 +27,7 @@ import java.util.HashMap; import java.util.List; import java.util.Map; import java.util.Set; +import java.util.function.BiConsumer; import org.apache.ignite.IgniteCheckedException; import org.apache.ignite.IgniteException; import org.apache.ignite.internal.IgniteEx; @@ -73,6 +74,25 @@ public class IdleVerifyUtility { IgniteThrowableSupplier<FilePageStore> pageStoreSup, int partId, byte pageType + ) { + checkPartitionsPageCrcSum(pageStoreSup, partId, pageType, null); + } + + /** + * Checks CRC sum of pages with {@code pageType} page type stored in partition with {@code partId} id + * and associated with cache group. + * + * @param pageStoreSup Page store supplier. + * @param partId Partition id. + * @param pageType Page type. Possible types {@link PageIdAllocator#FLAG_DATA}, {@link PageIdAllocator#FLAG_IDX} + * and {@link PageIdAllocator#FLAG_AUX}. + * @param pagePostProcessor Page post processor closure. + */ + public static void checkPartitionsPageCrcSum( + IgniteThrowableSupplier<FilePageStore> pageStoreSup, + int partId, + byte pageType, + @Nullable BiConsumer<Long, ByteBuffer> pagePostProcessor ) { assert pageType == FLAG_DATA || pageType == FLAG_IDX || pageType == FLAG_AUX : pageType; @@ -89,6 +109,9 @@ public class IdleVerifyUtility { buf.clear(); pageStore.read(pageId, buf, true, true); + + if (pagePostProcessor != null) + pagePostProcessor.accept(pageId, buf); } } catch (Throwable e) { diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/compress/CompressionHandler.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/compress/CompressionHandler.java new file mode 100644 index 00000000000..a1dd5146aad --- /dev/null +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/compress/CompressionHandler.java @@ -0,0 +1,130 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ignite.internal.processors.compress; + +import java.io.File; +import java.nio.ByteBuffer; +import org.apache.ignite.IgniteCheckedException; +import org.apache.ignite.configuration.CacheConfiguration; +import org.apache.ignite.configuration.DataStorageConfiguration; +import org.apache.ignite.configuration.DiskPageCompression; +import org.apache.ignite.internal.GridKernalContext; +import org.apache.ignite.internal.pagemem.store.PageStore; +import org.apache.ignite.internal.util.typedef.internal.CU; +import org.apache.ignite.internal.util.typedef.internal.U; + +/** */ +public class CompressionHandler { + /** */ + private final CompressionProcessor compressProc; + + /** */ + private final int diskPageCompressLevel; + + /** */ + private final DiskPageCompression diskPageCompression; + + /** */ + private CompressionHandler( + CompressionProcessor compressProc, + DiskPageCompression diskPageCompression, + int diskPageCompressLevel + ) { + this.diskPageCompression = diskPageCompression; + this.diskPageCompressLevel = diskPageCompressLevel; + this.compressProc = compressProc; + } + + /** */ + private CompressionHandler() { + diskPageCompression = DiskPageCompression.DISABLED; + diskPageCompressLevel = 0; + compressProc = null; + } + + /** + * @return Disk page compression algorithm.. + */ + public DiskPageCompression diskPageCompression() { + return diskPageCompression; + } + + /** + * @return Disk page compression level. + */ + public int diskPageCompressionLevel() { + return diskPageCompressLevel; + } + + /** + * @return {@code true} if disk page compression is enabled. + */ + public boolean compressionEnabled() { + return diskPageCompression != DiskPageCompression.DISABLED; + } + + /** + * @param page Page buffer. + * @param store Page store. + * @return Compressed or the same buffer. + * @throws IgniteCheckedException If failed. + */ + public ByteBuffer compressPage(ByteBuffer page, PageStore store) throws IgniteCheckedException { + if (diskPageCompression == DiskPageCompression.DISABLED) + return page; + + int blockSize = store.getBlockSize(); + + if (blockSize <= 0) + throw new IgniteCheckedException("Failed to detect storage block size on " + U.osString()); + + return compressProc.compressPage(page, store.getPageSize(), blockSize, diskPageCompression, diskPageCompressLevel); + } + + /** + * Creates compression handler. + * + * @param ctx Grid kernal context. + * @param cfg Cache or cache group configuration. + * @return Compression handler. + */ + public static CompressionHandler create( + GridKernalContext ctx, + CacheConfiguration cfg + ) throws IgniteCheckedException { + DiskPageCompression diskPageCompr = cfg.getDiskPageCompression(); + DataStorageConfiguration dsCfg = ctx.config().getDataStorageConfiguration(); + + if (ctx.clientNode() || diskPageCompr == DiskPageCompression.DISABLED || !CU.isPersistentCache(cfg, dsCfg)) + return new CompressionHandler(); + + CompressionProcessor comprProc = ctx.compress(); + + int lvl = cfg.getDiskPageCompressionLevel() == null ? + CompressionProcessor.getDefaultCompressionLevel(diskPageCompr) : + CompressionProcessor.checkCompressionLevelBounds(cfg.getDiskPageCompressionLevel(), diskPageCompr); + + File dbPath = ctx.pdsFolderResolver().resolveFolders().persistentStoreRootPath(); + + assert dbPath != null; + + comprProc.checkPageCompressionSupported(dbPath.toPath(), dsCfg.getPageSize()); + + return new CompressionHandler(comprProc, diskPageCompr, lvl); + } +} diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/persistence/snapshot/AbstractSnapshotSelfTest.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/persistence/snapshot/AbstractSnapshotSelfTest.java index 6b647084ed9..0f2aef16714 100644 --- a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/persistence/snapshot/AbstractSnapshotSelfTest.java +++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/persistence/snapshot/AbstractSnapshotSelfTest.java @@ -28,6 +28,7 @@ import java.util.ArrayDeque; import java.util.ArrayList; import java.util.Arrays; import java.util.Collection; +import java.util.Collections; import java.util.HashMap; import java.util.List; import java.util.Map; @@ -48,6 +49,7 @@ import org.apache.ignite.IgniteCheckedException; import org.apache.ignite.IgniteDataStreamer; import org.apache.ignite.IgniteException; import org.apache.ignite.IgniteLogger; +import org.apache.ignite.IgniteSystemProperties; import org.apache.ignite.binary.BinaryType; import org.apache.ignite.cache.CacheAtomicityMode; import org.apache.ignite.cache.CacheMode; @@ -58,6 +60,7 @@ import org.apache.ignite.cluster.ClusterState; import org.apache.ignite.configuration.CacheConfiguration; import org.apache.ignite.configuration.DataRegionConfiguration; import org.apache.ignite.configuration.DataStorageConfiguration; +import org.apache.ignite.configuration.DiskPageCompression; import org.apache.ignite.configuration.IgniteConfiguration; import org.apache.ignite.internal.GridKernalContext; import org.apache.ignite.internal.IgniteEx; @@ -94,6 +97,8 @@ import org.junit.runner.RunWith; import org.junit.runners.Parameterized; import static java.nio.file.Files.newDirectoryStream; +import static org.apache.ignite.IgniteSystemProperties.IGNITE_DEFAULT_DATA_STORAGE_PAGE_SIZE; +import static org.apache.ignite.IgniteSystemProperties.IGNITE_DEFAULT_DISK_PAGE_COMPRESSION; import static org.apache.ignite.cache.affinity.rendezvous.RendezvousAffinityFunction.DFLT_PARTITION_COUNT; import static org.apache.ignite.cluster.ClusterState.ACTIVE; import static org.apache.ignite.cluster.ClusterState.INACTIVE; @@ -124,6 +129,14 @@ public abstract class AbstractSnapshotSelfTest extends GridCommonAbstractTest { /** Timeout in milliseconds to await for snapshot operation being completed. */ protected static final long TIMEOUT = 15_000; + /** */ + protected static final DiskPageCompression DISK_PAGE_COMPRESSION = + IgniteSystemProperties.getEnum(IGNITE_DEFAULT_DISK_PAGE_COMPRESSION, DiskPageCompression.DISABLED); + + /** */ + protected static final int PAGE_SIZE = + IgniteSystemProperties.getInteger(IGNITE_DEFAULT_DATA_STORAGE_PAGE_SIZE, DFLT_PAGE_SIZE); + /** List of collected snapshot test events. */ protected final List<Integer> locEvts = new CopyOnWriteArrayList<>(); @@ -153,6 +166,9 @@ public abstract class AbstractSnapshotSelfTest extends GridCommonAbstractTest { /** Parameters. */ @Parameterized.Parameters(name = "Encryption={0}") public static Collection<Boolean> encryptionParams() { + if (DISK_PAGE_COMPRESSION != DiskPageCompression.DISABLED) + return Collections.singletonList(false); + return Arrays.asList(false, true); } @@ -171,10 +187,10 @@ public abstract class AbstractSnapshotSelfTest extends GridCommonAbstractTest { .setCommunicationSpi(new TestRecordingCommunicationSpi()) .setDataStorageConfiguration(new DataStorageConfiguration() .setDefaultDataRegionConfiguration(new DataRegionConfiguration() - .setMaxSize(100L * 1024 * 1024) + .setMaxSize(512L * 1024 * 1024) .setPersistenceEnabled(persistence)) .setCheckpointFrequency(3000) - .setPageSize(DFLT_PAGE_SIZE)) + .setPageSize(PAGE_SIZE)) .setClusterStateOnStart(INACTIVE) .setIncludeEventTypes(EVTS_CLUSTER_SNAPSHOT) .setDiscoverySpi(discoSpi); diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/persistence/snapshot/IgniteClusterSnapshotCheckTest.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/persistence/snapshot/IgniteClusterSnapshotCheckTest.java index c17f9004c82..af574b63b97 100644 --- a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/persistence/snapshot/IgniteClusterSnapshotCheckTest.java +++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/persistence/snapshot/IgniteClusterSnapshotCheckTest.java @@ -49,6 +49,7 @@ import org.apache.ignite.internal.binary.BinaryContext; import org.apache.ignite.internal.binary.BinaryObjectImpl; import org.apache.ignite.internal.managers.communication.GridMessageListener; import org.apache.ignite.internal.processors.affinity.AffinityTopologyVersion; +import org.apache.ignite.internal.processors.cache.CacheGroupContext; import org.apache.ignite.internal.processors.cache.GridCacheAdapter; import org.apache.ignite.internal.processors.cache.GridCacheEntryEx; import org.apache.ignite.internal.processors.cache.GridCacheSharedContext; @@ -67,6 +68,7 @@ import org.apache.ignite.internal.processors.cache.verify.PartitionHashRecordV2; import org.apache.ignite.internal.processors.cache.verify.PartitionKeyV2; import org.apache.ignite.internal.processors.cache.verify.VerifyBackupPartitionsTaskV2; import org.apache.ignite.internal.processors.cache.version.GridCacheVersion; +import org.apache.ignite.internal.processors.compress.CompressionProcessor; import org.apache.ignite.internal.util.GridUnsafe; import org.apache.ignite.internal.util.lang.GridIterator; import org.apache.ignite.internal.util.typedef.F; @@ -257,15 +259,36 @@ public class IgniteClusterSnapshotCheckTest extends AbstractSnapshotSelfTest { buff.clear(); pageStore.read(0, buff, false); - PagePartitionMetaIO io = PageIO.getPageIO(buff); - long pageAddr = GridUnsafe.bufferAddress(buff); + boolean shouldCompress = false; + if (PageIO.getCompressionType(pageAddr) != CompressionProcessor.UNCOMPRESSED_PAGE) { + shouldCompress = true; + + ignite.context().compress().decompressPage(buff, pageStore.getPageSize()); + } + + PagePartitionMetaIO io = PageIO.getPageIO(buff); + io.setUpdateCounter(pageAddr, CACHE_KEYS_RANGE * 2); - pageStore.beginRecover(); + if (shouldCompress) { + CacheGroupContext grpCtx = ignite.context().cache().cacheGroup(grpId); - buff.flip(); + assertNotNull("Group context for grpId:" + grpId, grpCtx); + + ByteBuffer compressedPageBuf = grpCtx.compressionHandler().compressPage(buff, pageStore); + + if (compressedPageBuf != buff) { + buff = compressedPageBuf; + + PageIO.setCrc(buff, 0); + } + } + else + buff.flip(); + + pageStore.beginRecover(); pageStore.write(PageIO.getPageId(buff), buff, 0, true); pageStore.finishRecover(); } diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/persistence/snapshot/IgniteClusterSnapshotDeltaTest.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/persistence/snapshot/IgniteClusterSnapshotDeltaTest.java index 8912396d691..758c90a524d 100644 --- a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/persistence/snapshot/IgniteClusterSnapshotDeltaTest.java +++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/persistence/snapshot/IgniteClusterSnapshotDeltaTest.java @@ -44,7 +44,7 @@ import org.junit.runner.RunWith; import org.junit.runners.Parameterized; import static org.apache.ignite.IgniteSystemProperties.IGNITE_SNAPSHOT_SEQUENTIAL_WRITE; -import static org.apache.ignite.configuration.DataStorageConfiguration.DFLT_PAGE_SIZE; +import static org.apache.ignite.internal.pagemem.PageIdAllocator.INDEX_PARTITION; import static org.apache.ignite.internal.processors.cache.persistence.file.FilePageStoreManager.CACHE_DIR_PREFIX; import static org.apache.ignite.internal.processors.cache.persistence.snapshot.IgniteSnapshotManager.DeltaSortedIterator.DELTA_SORT_BATCH_SIZE; import static org.apache.ignite.internal.processors.cache.persistence.snapshot.IgniteSnapshotManager.partDeltaIndexFile; @@ -77,7 +77,7 @@ public class IgniteClusterSnapshotDeltaTest extends AbstractSnapshotSelfTest { @Test public void testSendDelta() throws Exception { int keys = 10_000; - byte[] payload = new byte[DFLT_PAGE_SIZE / 2]; + byte[] payload = new byte[PAGE_SIZE / 2]; int partCnt = 2; System.setProperty(IGNITE_SNAPSHOT_SEQUENTIAL_WRITE, String.valueOf(sequentialWrite)); @@ -121,8 +121,8 @@ public class IgniteClusterSnapshotDeltaTest extends AbstractSnapshotSelfTest { } @Override public void sendDelta0(File delta, String cacheDirName, GroupPartitionId pair) { - if (cacheDir.equals(cacheDirName)) - assertTrue(delta.length() > 0); + if (cacheDir.equals(cacheDirName) && pair.getPartitionId() != INDEX_PARTITION) + assertTrue("Delta length : " + delta.length() + " > 0", delta.length() > 0); if (!sequentialWrite) U.delete(partDeltaIndexFile(delta)); diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/persistence/snapshot/IgniteSnapshotManagerSelfTest.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/persistence/snapshot/IgniteSnapshotManagerSelfTest.java index 4631a82202f..59923bebe08 100644 --- a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/persistence/snapshot/IgniteSnapshotManagerSelfTest.java +++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/persistence/snapshot/IgniteSnapshotManagerSelfTest.java @@ -68,7 +68,6 @@ import org.apache.ignite.testframework.LogListener; import org.junit.Test; import static java.util.Objects.nonNull; -import static org.apache.ignite.configuration.DataStorageConfiguration.DFLT_PAGE_SIZE; import static org.apache.ignite.internal.processors.cache.persistence.snapshot.IgniteSnapshotManager.CP_SNAPSHOT_REASON; import static org.apache.ignite.internal.processors.cache.persistence.snapshot.IgniteSnapshotManager.SNAPSHOT_RUNNER_THREAD_PREFIX; import static org.apache.ignite.testframework.GridTestUtils.assertThrowsAnyCause; @@ -79,7 +78,7 @@ import static org.apache.ignite.testframework.GridTestUtils.setFieldValue; */ public class IgniteSnapshotManagerSelfTest extends AbstractSnapshotSelfTest { /** The size of value array to fit 3 pages. */ - private static final int SIZE_FOR_FIT_3_PAGES = 12008; + private static final int SIZE_FOR_FIT_3_PAGES = PAGE_SIZE * 2 + PAGE_SIZE / 2; /** Listenning logger. */ private ListeningTestLogger listenLog; @@ -462,7 +461,7 @@ public class IgniteSnapshotManagerSelfTest extends AbstractSnapshotSelfTest { IgniteEx ignite = startGridsWithoutCache(2); - assertEquals(DFLT_PAGE_SIZE, ignite.configuration().getDataStorageConfiguration().getPageSize()); + assertEquals(PAGE_SIZE, ignite.configuration().getDataStorageConfiguration().getPageSize()); for (int i = 0; i < keys; i++) ignite.getOrCreateCache(ccfg).put(i, new Value(new byte[SIZE_FOR_FIT_3_PAGES])); diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/persistence/snapshot/IgniteSnapshotWithMetastorageTest.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/persistence/snapshot/IgniteSnapshotWithMetastorageTest.java index 0c9df199145..6781c2e31d2 100644 --- a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/persistence/snapshot/IgniteSnapshotWithMetastorageTest.java +++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/persistence/snapshot/IgniteSnapshotWithMetastorageTest.java @@ -31,6 +31,7 @@ import java.util.concurrent.atomic.AtomicInteger; import java.util.function.Function; import org.apache.ignite.IgniteCheckedException; import org.apache.ignite.IgniteException; +import org.apache.ignite.configuration.CacheConfiguration; import org.apache.ignite.configuration.IgniteConfiguration; import org.apache.ignite.internal.IgniteEx; import org.apache.ignite.internal.IgniteInternalFuture; @@ -57,7 +58,10 @@ public class IgniteSnapshotWithMetastorageTest extends AbstractSnapshotSelfTest /** @throws Exception If fails. */ @Test public void testClusterSnapshotWithMetastorage() throws Exception { - IgniteEx ignite = startGridsWithCache(2, dfltCacheCfg, CACHE_KEYS_RANGE); + CacheConfiguration<Integer, Object> cfg2 = txCacheConfig(new CacheConfiguration<>("test")); + + IgniteEx ignite = startGridsWithCache(2, CACHE_KEYS_RANGE, Integer::new, dfltCacheCfg, cfg2); + startClientGrid(); ignite.context().distributedMetastorage().write("key", "value"); diff --git a/modules/core/src/test/java/org/apache/ignite/loadtests/hashmap/GridCacheTestContext.java b/modules/core/src/test/java/org/apache/ignite/loadtests/hashmap/GridCacheTestContext.java index 018fccbe6c1..0bb13e9710d 100644 --- a/modules/core/src/test/java/org/apache/ignite/loadtests/hashmap/GridCacheTestContext.java +++ b/modules/core/src/test/java/org/apache/ignite/loadtests/hashmap/GridCacheTestContext.java @@ -22,7 +22,6 @@ import org.apache.ignite.cache.store.CacheStore; import org.apache.ignite.configuration.CacheConfiguration; import org.apache.ignite.internal.processors.affinity.AffinityTopologyVersion; import org.apache.ignite.internal.processors.cache.CacheAffinitySharedManager; -import org.apache.ignite.internal.processors.cache.CacheCompressionManager; import org.apache.ignite.internal.processors.cache.CacheDiagnosticManager; import org.apache.ignite.internal.processors.cache.CacheOsConflictResolutionManager; import org.apache.ignite.internal.processors.cache.CacheType; @@ -97,7 +96,6 @@ public class GridCacheTestContext<K, V> extends GridCacheContext<K, V> { true, false, false, - new CacheCompressionManager(), new GridCacheEventManager(), new CacheOsStoreManager(null, new CacheConfiguration()), new GridCacheEvictionManager(), diff --git a/modules/core/src/test/java/org/apache/ignite/testframework/junits/multijvm/JavaVersionCommandParser.java b/modules/core/src/test/java/org/apache/ignite/testframework/junits/multijvm/JavaVersionCommandParser.java index 2ce890ca8d5..6b97f9696be 100644 --- a/modules/core/src/test/java/org/apache/ignite/testframework/junits/multijvm/JavaVersionCommandParser.java +++ b/modules/core/src/test/java/org/apache/ignite/testframework/junits/multijvm/JavaVersionCommandParser.java @@ -26,7 +26,7 @@ import org.apache.ignite.internal.util.typedef.internal.U; */ class JavaVersionCommandParser { /** Pattern for parsing 'java -version' command output. */ - private static final Pattern versionPattern = Pattern.compile("java version \"([^\"]+)\".*", Pattern.DOTALL); + private static final Pattern versionPattern = Pattern.compile("(java|openjdk) version \"([^\"]+)\".*", Pattern.DOTALL); /** * Extracts major java version (like '17' or '1.8') from 'java -version' output. @@ -41,7 +41,7 @@ class JavaVersionCommandParser { versionCommandOutput + "'"); } - String fullJavaVersion = matcher.group(1); + String fullJavaVersion = matcher.group(2); return U.majorJavaVersion(fullJavaVersion); } diff --git a/modules/core/src/test/java/org/apache/ignite/testframework/junits/multijvm/JavaVersionCommandParserTest.java b/modules/core/src/test/java/org/apache/ignite/testframework/junits/multijvm/JavaVersionCommandParserTest.java index cbd12b910b4..07bb067e2cd 100644 --- a/modules/core/src/test/java/org/apache/ignite/testframework/junits/multijvm/JavaVersionCommandParserTest.java +++ b/modules/core/src/test/java/org/apache/ignite/testframework/junits/multijvm/JavaVersionCommandParserTest.java @@ -49,6 +49,12 @@ public class JavaVersionCommandParserTest { "Java HotSpot(TM) Server VM (build 25.311-b11, mixed mode)", 8 }, + new Object[] { + "openjdk version \"1.8.0_352\"\n" + + "OpenJDK Runtime Environment (build 1.8.0_352-8u352-ga-1~22.04-b08)\n" + + "OpenJDK 64-Bit Server VM (build 25.352-b08, mixed mode)", + 8 + }, new Object[]{ "java version \"11.0.6\" 2020-01-14 LTS\n" + "Java(TM) SE Runtime Environment 18.9 (build 11.0.6+8-LTS)\n" + diff --git a/modules/core/src/test/java/org/apache/ignite/testsuites/IgniteSnapshotTestSuite.java b/modules/core/src/test/java/org/apache/ignite/testsuites/IgniteSnapshotTestSuite.java index 33419af614b..7bcbd34be38 100644 --- a/modules/core/src/test/java/org/apache/ignite/testsuites/IgniteSnapshotTestSuite.java +++ b/modules/core/src/test/java/org/apache/ignite/testsuites/IgniteSnapshotTestSuite.java @@ -17,6 +17,9 @@ package org.apache.ignite.testsuites; +import java.util.ArrayList; +import java.util.Collection; +import java.util.List; import org.apache.ignite.internal.processors.cache.persistence.snapshot.EncryptedSnapshotTest; import org.apache.ignite.internal.processors.cache.persistence.snapshot.IgniteClusterSnapshotCheckTest; import org.apache.ignite.internal.processors.cache.persistence.snapshot.IgniteClusterSnapshotDeltaTest; @@ -32,27 +35,38 @@ import org.apache.ignite.internal.processors.cache.persistence.snapshot.IgniteSn import org.apache.ignite.internal.processors.cache.persistence.snapshot.IgniteSnapshotRestoreFromRemoteTest; import org.apache.ignite.internal.processors.cache.persistence.snapshot.IgniteSnapshotWithMetastorageTest; import org.apache.ignite.internal.processors.cache.persistence.snapshot.PlainSnapshotTest; +import org.apache.ignite.testframework.GridTestUtils; +import org.apache.ignite.testframework.junits.DynamicSuite; import org.junit.runner.RunWith; -import org.junit.runners.Suite; /** */ -@RunWith(Suite.class) -@Suite.SuiteClasses({ - IgniteSnapshotManagerSelfTest.class, - IgniteClusterSnapshotSelfTest.class, - IgniteSnapshotRemoteRequestTest.class, - IgniteClusterSnapshotCheckTest.class, - IgniteSnapshotWithMetastorageTest.class, - IgniteSnapshotMXBeanTest.class, - IgniteClusterSnapshotRestoreSelfTest.class, - IgniteClusterSnapshotHandlerTest.class, - IgniteSnapshotRestoreFromRemoteTest.class, - PlainSnapshotTest.class, - EncryptedSnapshotTest.class, - IgniteClusterSnapshotWalRecordTest.class, - IgniteClusterSnapshotStreamerTest.class, - IgniteSnapshotConsistencyTest.class, - IgniteClusterSnapshotDeltaTest.class -}) +@RunWith(DynamicSuite.class) public class IgniteSnapshotTestSuite { + /** */ + public static List<Class<?>> suite() { + List<Class<?>> suite = new ArrayList<>(); + + addSnapshotTests(suite, null); + + return suite; + } + + /** */ + public static void addSnapshotTests(List<Class<?>> suite, Collection<Class> ignoredTests) { + GridTestUtils.addTestIfNeeded(suite, IgniteSnapshotManagerSelfTest.class, ignoredTests); + GridTestUtils.addTestIfNeeded(suite, IgniteClusterSnapshotSelfTest.class, ignoredTests); + GridTestUtils.addTestIfNeeded(suite, IgniteSnapshotRemoteRequestTest.class, ignoredTests); + GridTestUtils.addTestIfNeeded(suite, IgniteClusterSnapshotCheckTest.class, ignoredTests); + GridTestUtils.addTestIfNeeded(suite, IgniteSnapshotWithMetastorageTest.class, ignoredTests); + GridTestUtils.addTestIfNeeded(suite, IgniteSnapshotMXBeanTest.class, ignoredTests); + GridTestUtils.addTestIfNeeded(suite, IgniteClusterSnapshotRestoreSelfTest.class, ignoredTests); + GridTestUtils.addTestIfNeeded(suite, IgniteClusterSnapshotHandlerTest.class, ignoredTests); + GridTestUtils.addTestIfNeeded(suite, IgniteSnapshotRestoreFromRemoteTest.class, ignoredTests); + GridTestUtils.addTestIfNeeded(suite, PlainSnapshotTest.class, ignoredTests); + GridTestUtils.addTestIfNeeded(suite, EncryptedSnapshotTest.class, ignoredTests); + GridTestUtils.addTestIfNeeded(suite, IgniteClusterSnapshotWalRecordTest.class, ignoredTests); + GridTestUtils.addTestIfNeeded(suite, IgniteClusterSnapshotStreamerTest.class, ignoredTests); + GridTestUtils.addTestIfNeeded(suite, IgniteSnapshotConsistencyTest.class, ignoredTests); + GridTestUtils.addTestIfNeeded(suite, IgniteClusterSnapshotDeltaTest.class, ignoredTests); + } } diff --git a/modules/indexing/src/test/java/org/apache/ignite/testsuites/IgniteSnapshotWithIndexingTestSuite.java b/modules/indexing/src/test/java/org/apache/ignite/testsuites/IgniteSnapshotWithIndexingTestSuite.java index be36d972ca1..240e8161226 100644 --- a/modules/indexing/src/test/java/org/apache/ignite/testsuites/IgniteSnapshotWithIndexingTestSuite.java +++ b/modules/indexing/src/test/java/org/apache/ignite/testsuites/IgniteSnapshotWithIndexingTestSuite.java @@ -17,20 +17,34 @@ package org.apache.ignite.testsuites; +import java.util.ArrayList; +import java.util.Collection; +import java.util.List; import org.apache.ignite.internal.processors.cache.persistence.snapshot.IgniteClusterSnapshotCheckWithIndexesTest; import org.apache.ignite.internal.processors.cache.persistence.snapshot.IgniteClusterSnapshotMetricsTest; import org.apache.ignite.internal.processors.cache.persistence.snapshot.IgniteClusterSnapshotRestoreWithIndexingTest; import org.apache.ignite.internal.processors.cache.persistence.snapshot.IgniteClusterSnapshotWithIndexesTest; +import org.apache.ignite.testframework.GridTestUtils; +import org.apache.ignite.testframework.junits.DynamicSuite; import org.junit.runner.RunWith; -import org.junit.runners.Suite; /** */ -@RunWith(Suite.class) -@Suite.SuiteClasses({ - IgniteClusterSnapshotWithIndexesTest.class, - IgniteClusterSnapshotCheckWithIndexesTest.class, - IgniteClusterSnapshotRestoreWithIndexingTest.class, - IgniteClusterSnapshotMetricsTest.class -}) +@RunWith(DynamicSuite.class) public class IgniteSnapshotWithIndexingTestSuite { + /** */ + public static List<Class<?>> suite() { + List<Class<?>> suite = new ArrayList<>(); + + addSnapshotTests(suite, null); + + return suite; + } + + /** */ + public static void addSnapshotTests(List<Class<?>> suite, Collection<Class> ignoredTests) { + GridTestUtils.addTestIfNeeded(suite, IgniteClusterSnapshotWithIndexesTest.class, ignoredTests); + GridTestUtils.addTestIfNeeded(suite, IgniteClusterSnapshotCheckWithIndexesTest.class, ignoredTests); + GridTestUtils.addTestIfNeeded(suite, IgniteClusterSnapshotRestoreWithIndexingTest.class, ignoredTests); + GridTestUtils.addTestIfNeeded(suite, IgniteClusterSnapshotMetricsTest.class, ignoredTests); + } }