http://git-wip-us.apache.org/repos/asf/ignite/blob/e8eeea37/modules/compress/src/test/java/org/apache/ignite/internal/processors/compress/FileSystemUtilsTest.java ---------------------------------------------------------------------- diff --git a/modules/compress/src/test/java/org/apache/ignite/internal/processors/compress/FileSystemUtilsTest.java b/modules/compress/src/test/java/org/apache/ignite/internal/processors/compress/FileSystemUtilsTest.java new file mode 100644 index 0000000..70dda0b --- /dev/null +++ b/modules/compress/src/test/java/org/apache/ignite/internal/processors/compress/FileSystemUtilsTest.java @@ -0,0 +1,193 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ignite.internal.processors.compress; + +import java.io.FileDescriptor; +import java.nio.ByteBuffer; +import java.nio.channels.FileChannel; +import java.nio.file.Files; +import java.nio.file.Path; +import java.nio.file.Paths; +import junit.framework.TestCase; +import org.apache.ignite.IgniteCheckedException; +import org.apache.ignite.internal.util.typedef.internal.U; + +import static java.nio.file.StandardOpenOption.READ; +import static java.nio.file.StandardOpenOption.TRUNCATE_EXISTING; +import static java.nio.file.StandardOpenOption.WRITE; +import static org.apache.ignite.internal.processors.compress.CompressionProcessorImpl.allocateDirectBuffer; +import static org.apache.ignite.internal.processors.compress.FileSystemUtils.getFileSystemBlockSize; +import static org.apache.ignite.internal.processors.compress.FileSystemUtils.getSparseFileSize; +import static org.apache.ignite.internal.processors.compress.FileSystemUtils.punchHole; + +/** + */ +public class FileSystemUtilsTest extends TestCase { + /** + * @throws Exception If failed. + */ + public void testSparseFiles() throws Exception { + if (!U.isLinux()) + return; + + Path file = Files.createTempFile("test_sparse_file_", ".bin"); + + try { + doTestSparseFiles(file, false); // Ext4 expected as default FS. + } + finally { + Files.delete(file); + } + } + + /** + * @throws Exception If failed. + */ + public void _testFileSystems() throws Exception { + doTestSparseFiles(Paths.get("/ext4/test_file"), false); + doTestSparseFiles(Paths.get("/btrfs/test_file"), false); + doTestSparseFiles(Paths.get("/xfs/test_file"), true); + } + + private static int getFD(FileChannel ch) throws IgniteCheckedException { + return U.<Integer>field(U.<FileDescriptor>field(ch, "fd"), "fd"); + } + + /** + * @param file File path. + * @param reopen Reopen file after each hole punch. XFS needs it. + * @throws Exception If failed. + */ + private void doTestSparseFiles(Path file, boolean reopen) throws Exception { + System.out.println(file); + + FileChannel ch = FileChannel.open(file, + READ, WRITE, TRUNCATE_EXISTING); + + try { + int fd = getFD(ch); + + int fsBlockSize = getFileSystemBlockSize(fd); + + System.out.println("fsBlockSize: " + fsBlockSize); + + assertTrue(fsBlockSize > 0); + + int pageSize = fsBlockSize * 4; + + ByteBuffer page = allocateDirectBuffer(pageSize); + + while (page.remaining() > 0) + page.putLong(0xABCDEF7654321EADL); + page.flip(); + + int pages = 5; + int blocks = pages * pageSize / fsBlockSize; + int fileSize = pages * pageSize; + int sparseSize = fileSize; + + for (int i = 0; i < pages; i++) { + ch.write(page, i * pageSize); + assertEquals(0, page.remaining()); + page.flip(); + } + + if (reopen) { + ch.force(true); + ch.close(); + ch = FileChannel.open(file, READ, WRITE); + fd = getFD(ch); + } + + assertEquals(fileSize, ch.size()); + assertEquals(fileSize, getSparseFileSize(fd)); + + int off = fsBlockSize * 3 - (fsBlockSize >>> 2); + int len = fsBlockSize; + assertEquals(0, punchHole(fd, off, len, fsBlockSize)); + if (reopen) { + ch.force(true); + ch.close(); + ch = FileChannel.open(file, READ, WRITE); + fd = getFD(ch); + } + assertEquals(fileSize, getSparseFileSize(fd)); + + off = 2 * fsBlockSize - 3; + len = 2 * fsBlockSize + 3; + assertEquals(2 * fsBlockSize, punchHole(fd, off, len, fsBlockSize)); + if (reopen) { + ch.force(true); + ch.close(); + ch = FileChannel.open(file, READ, WRITE); + fd = getFD(ch); + } + assertEquals(sparseSize -= 2 * fsBlockSize, getSparseFileSize(fd)); + + off = 10 * fsBlockSize; + len = 3 * fsBlockSize + 5; + assertEquals(3 * fsBlockSize, punchHole(fd, off, len, fsBlockSize)); + if (reopen) { + ch.force(true); + ch.close(); + ch = FileChannel.open(file, READ, WRITE); + fd = getFD(ch); + } + assertEquals(sparseSize -= 3 * fsBlockSize, getSparseFileSize(fd)); + + off = 15 * fsBlockSize + 1; + len = fsBlockSize; + assertEquals(0, punchHole(fd, off, len, fsBlockSize)); + + off = 15 * fsBlockSize - 1; + len = fsBlockSize; + assertEquals(0, punchHole(fd, off, len, fsBlockSize)); + + off = 15 * fsBlockSize; + len = fsBlockSize - 1; + assertEquals(0, punchHole(fd, off, len, fsBlockSize)); + + off = 15 * fsBlockSize; + len = fsBlockSize; + assertEquals(fsBlockSize, punchHole(fd, off, len, fsBlockSize)); + if (reopen) { + ch.force(true); + ch.close(); + ch = FileChannel.open(file, READ, WRITE); + fd = getFD(ch); + } + assertEquals(sparseSize -= fsBlockSize, getSparseFileSize(fd)); + + for (int i = 0; i < blocks - 1; i++) + punchHole(fd, fsBlockSize * i, fsBlockSize, fsBlockSize); + + if (reopen) { + ch.force(true); + ch.close(); + ch = FileChannel.open(file, READ, WRITE); + fd = getFD(ch); + } + + assertEquals(fsBlockSize, getSparseFileSize(fd)); + } + finally { + ch.close(); + } + } + +}
http://git-wip-us.apache.org/repos/asf/ignite/blob/e8eeea37/modules/compress/src/test/java/org/apache/ignite/testsuites/IgnitePdsCompressionTestSuite.java ---------------------------------------------------------------------- diff --git a/modules/compress/src/test/java/org/apache/ignite/testsuites/IgnitePdsCompressionTestSuite.java b/modules/compress/src/test/java/org/apache/ignite/testsuites/IgnitePdsCompressionTestSuite.java new file mode 100644 index 0000000..a977700 --- /dev/null +++ b/modules/compress/src/test/java/org/apache/ignite/testsuites/IgnitePdsCompressionTestSuite.java @@ -0,0 +1,56 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ignite.testsuites; + +import junit.framework.TestSuite; +import org.apache.ignite.internal.processors.compress.CompressionProcessorTest; +import org.apache.ignite.internal.processors.compress.DiskPageCompressionIntegrationAsyncTest; +import org.apache.ignite.internal.processors.compress.DiskPageCompressionIntegrationTest; +import org.apache.ignite.internal.processors.compress.FileSystemUtilsTest; + +import static org.apache.ignite.IgniteSystemProperties.IGNITE_DEFAULT_DATA_STORAGE_PAGE_SIZE; +import static org.apache.ignite.IgniteSystemProperties.IGNITE_DEFAULT_DISK_PAGE_COMPRESSION; +import static org.apache.ignite.configuration.DiskPageCompression.ZSTD; + +/** + */ +public class IgnitePdsCompressionTestSuite { + /** + * @return Suite. + */ + public static TestSuite suite() { + TestSuite suite = new TestSuite("Ignite Persistent Store Test Suite (with page compression)."); + + suite.addTestSuite(CompressionProcessorTest.class); + suite.addTestSuite(FileSystemUtilsTest.class); + suite.addTestSuite(DiskPageCompressionIntegrationTest.class); + suite.addTestSuite(DiskPageCompressionIntegrationAsyncTest.class); + + enableCompressionByDefault(); + IgnitePdsTestSuite.addRealPageStoreTests(suite); + + return suite; + } + + /** + */ + static void enableCompressionByDefault() { + System.setProperty(IGNITE_DEFAULT_DISK_PAGE_COMPRESSION, ZSTD.name()); + System.setProperty(IGNITE_DEFAULT_DATA_STORAGE_PAGE_SIZE, String.valueOf(8 * 1024)); + } +} http://git-wip-us.apache.org/repos/asf/ignite/blob/e8eeea37/modules/compress/src/test/java/org/apache/ignite/testsuites/IgnitePdsCompressionTestSuite2.java ---------------------------------------------------------------------- diff --git a/modules/compress/src/test/java/org/apache/ignite/testsuites/IgnitePdsCompressionTestSuite2.java b/modules/compress/src/test/java/org/apache/ignite/testsuites/IgnitePdsCompressionTestSuite2.java new file mode 100644 index 0000000..3fb8ac2 --- /dev/null +++ b/modules/compress/src/test/java/org/apache/ignite/testsuites/IgnitePdsCompressionTestSuite2.java @@ -0,0 +1,38 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ignite.testsuites; + +import junit.framework.TestSuite; + +import static org.apache.ignite.testsuites.IgnitePdsCompressionTestSuite.enableCompressionByDefault; + +/** + */ +public class IgnitePdsCompressionTestSuite2 { + /** + * @return Suite. + */ + public static TestSuite suite() { + TestSuite suite = new TestSuite("Ignite Persistent Store Test Suite 2 (with page compression)."); + + enableCompressionByDefault(); + IgnitePdsTestSuite2.addRealPageStoreTests(suite); + + return suite; + } +} http://git-wip-us.apache.org/repos/asf/ignite/blob/e8eeea37/modules/core/src/main/java/org/apache/ignite/DataStorageMetrics.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/DataStorageMetrics.java b/modules/core/src/main/java/org/apache/ignite/DataStorageMetrics.java index cdde0ac..e23d188 100644 --- a/modules/core/src/main/java/org/apache/ignite/DataStorageMetrics.java +++ b/modules/core/src/main/java/org/apache/ignite/DataStorageMetrics.java @@ -212,4 +212,22 @@ public interface DataStorageMetrics { * @return Checkpoint buffer size in bytes. */ public long getCheckpointBufferSize(); + + /** + * Storage space allocated in bytes. + * + * @return Storage space allocated in bytes. + */ + public long getStorageSize(); + + /** + * Storage space allocated adjusted for possible sparsity in bytes. + * + * May produce unstable or even incorrect result on some file systems (e.g. XFS). + * Known to work correctly on Ext4 and Btrfs. + * + * @return Storage space allocated adjusted for possible sparsity in bytes + * or negative value is not supported. + */ + public long getSparseStorageSize(); } http://git-wip-us.apache.org/repos/asf/ignite/blob/e8eeea37/modules/core/src/main/java/org/apache/ignite/IgniteSystemProperties.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/IgniteSystemProperties.java b/modules/core/src/main/java/org/apache/ignite/IgniteSystemProperties.java index ccf7ebf..2d27840 100644 --- a/modules/core/src/main/java/org/apache/ignite/IgniteSystemProperties.java +++ b/modules/core/src/main/java/org/apache/ignite/IgniteSystemProperties.java @@ -1047,6 +1047,16 @@ public final class IgniteSystemProperties { public static final String IGNITE_RECOVERY_VERBOSE_LOGGING = "IGNITE_RECOVERY_VERBOSE_LOGGING"; /** + * Sets default {@link CacheConfiguration#setDiskPageCompression disk page compression}. + */ + public static final String IGNITE_DEFAULT_DISK_PAGE_COMPRESSION = "IGNITE_DEFAULT_DISK_PAGE_COMPRESSION"; + + /** + * Sets default {@link DataStorageConfiguration#setPageSize storage page size}. + */ + public static final String IGNITE_DEFAULT_DATA_STORAGE_PAGE_SIZE = "IGNITE_DEFAULT_DATA_STORAGE_PAGE_SIZE"; + + /** * Enforces singleton. */ private IgniteSystemProperties() { @@ -1054,6 +1064,40 @@ public final class IgniteSystemProperties { } /** + * @param enumCls Enum type. + * @param name Name of the system property or environment variable. + * @return Enum value or {@code null} if the property is not set. + */ + public static <E extends Enum<E>> E getEnum(Class<E> enumCls, String name) { + return getEnum(enumCls, name, null); + } + + /** + * @param name Name of the system property or environment variable. + * @return Enum value or the given default. + */ + public static <E extends Enum<E>> E getEnum(String name, E dflt) { + return getEnum(dflt.getDeclaringClass(), name, dflt); + } + + /** + * @param enumCls Enum type. + * @param name Name of the system property or environment variable. + * @param dflt Default value. + * @return Enum value or the given default. + */ + private static <E extends Enum<E>> E getEnum(Class<E> enumCls, String name, E dflt) { + assert enumCls != null; + + String val = getString(name); + + if (val == null) + return dflt; + + return Enum.valueOf(enumCls, val); + } + + /** * Gets either system property or environment variable with given name. * * @param name Name of the system property or environment variable. http://git-wip-us.apache.org/repos/asf/ignite/blob/e8eeea37/modules/core/src/main/java/org/apache/ignite/configuration/CacheConfiguration.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/configuration/CacheConfiguration.java b/modules/core/src/main/java/org/apache/ignite/configuration/CacheConfiguration.java index 5c91dc0..e27961d 100644 --- a/modules/core/src/main/java/org/apache/ignite/configuration/CacheConfiguration.java +++ b/modules/core/src/main/java/org/apache/ignite/configuration/CacheConfiguration.java @@ -33,6 +33,7 @@ import javax.cache.integration.CacheLoader; import javax.cache.integration.CacheWriter; import org.apache.ignite.Ignite; import org.apache.ignite.IgniteCache; +import org.apache.ignite.IgniteSystemProperties; import org.apache.ignite.cache.CacheAtomicityMode; import org.apache.ignite.cache.CacheEntryProcessor; import org.apache.ignite.cache.CacheInterceptor; @@ -62,6 +63,8 @@ import org.apache.ignite.spi.encryption.EncryptionSpi; import org.apache.ignite.spi.encryption.keystore.KeystoreEncryptionSpi; import org.jetbrains.annotations.Nullable; +import static org.apache.ignite.IgniteSystemProperties.IGNITE_DEFAULT_DISK_PAGE_COMPRESSION; + /** * This class defines grid cache configuration. This configuration is passed to * grid via {@link IgniteConfiguration#getCacheConfiguration()} method. It defines all configuration @@ -383,6 +386,13 @@ public class CacheConfiguration<K, V> extends MutableConfiguration<K, V> { */ private boolean encryptionEnabled; + /** */ + private DiskPageCompression diskPageCompression = IgniteSystemProperties.getEnum( + DiskPageCompression.class, IGNITE_DEFAULT_DISK_PAGE_COMPRESSION); + + /** */ + private Integer diskPageCompressionLevel; + /** Empty constructor (all values are initialized to their defaults). */ public CacheConfiguration() { /* No-op. */ @@ -443,6 +453,8 @@ public class CacheConfiguration<K, V> extends MutableConfiguration<K, V> { nearCfg = cc.getNearConfiguration(); nodeFilter = cc.getNodeFilter(); onheapCache = cc.isOnheapCacheEnabled(); + diskPageCompression = cc.getDiskPageCompression(); + diskPageCompressionLevel = cc.getDiskPageCompressionLevel(); partLossPlc = cc.getPartitionLossPolicy(); pluginCfgs = cc.getPluginConfigurations(); qryDetailMetricsSz = cc.getQueryDetailMetricsSize(); @@ -2297,6 +2309,54 @@ public class CacheConfiguration<K, V> extends MutableConfiguration<K, V> { return this; } + /** + * Gets disk page compression algorithm. + * Makes sense only with enabled {@link DataRegionConfiguration#setPersistenceEnabled persistence}. + * + * @return Disk page compression algorithm. + * @see #getDiskPageCompressionLevel + */ + public DiskPageCompression getDiskPageCompression() { + return diskPageCompression; + } + + /** + * Sets disk page compression algorithm. + * Makes sense only with enabled {@link DataRegionConfiguration#setPersistenceEnabled persistence}. + * + * @param diskPageCompression Disk page compression algorithm. + * @return {@code this} for chaining. + * @see #setDiskPageCompressionLevel + */ + public CacheConfiguration<K,V> setDiskPageCompression(DiskPageCompression diskPageCompression) { + this.diskPageCompression = diskPageCompression; + + return this; + } + + /** + * Gets {@link #getDiskPageCompression algorithm} specific disk page compression level. + * + * @return Disk page compression level or {@code null} for default. + */ + public Integer getDiskPageCompressionLevel() { + return diskPageCompressionLevel; + } + + /** + * Sets {@link #setDiskPageCompression algorithm} specific disk page compression level. + * + * @param diskPageCompressionLevel Disk page compression level or {@code null} to use default. + * {@link DiskPageCompression#ZSTD Zstd}: from {@code -131072} to {@code 22} (default {@code 3}). + * {@link DiskPageCompression#LZ4 LZ4}: from {@code 0} to {@code 17} (default {@code 0}). + * @return {@code this} for chaining. + */ + public CacheConfiguration<K,V> setDiskPageCompressionLevel(Integer diskPageCompressionLevel) { + this.diskPageCompressionLevel = diskPageCompressionLevel; + + return this; + } + /** {@inheritDoc} */ @Override public String toString() { return S.toString(CacheConfiguration.class, this); http://git-wip-us.apache.org/repos/asf/ignite/blob/e8eeea37/modules/core/src/main/java/org/apache/ignite/configuration/DataStorageConfiguration.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/configuration/DataStorageConfiguration.java b/modules/core/src/main/java/org/apache/ignite/configuration/DataStorageConfiguration.java index 20b314f..4aca0b1 100644 --- a/modules/core/src/main/java/org/apache/ignite/configuration/DataStorageConfiguration.java +++ b/modules/core/src/main/java/org/apache/ignite/configuration/DataStorageConfiguration.java @@ -28,6 +28,8 @@ import org.apache.ignite.internal.util.typedef.internal.A; import org.apache.ignite.internal.util.typedef.internal.S; import org.apache.ignite.internal.util.typedef.internal.U; +import static org.apache.ignite.IgniteSystemProperties.IGNITE_DEFAULT_DATA_STORAGE_PAGE_SIZE; + /** * A durable memory configuration for an Apache Ignite node. The durable memory is a manageable off-heap based memory * architecture that divides all expandable data regions into pages of fixed size @@ -87,6 +89,12 @@ public class DataStorageConfiguration implements Serializable { /** Default memory page size. */ public static final int DFLT_PAGE_SIZE = 4 * 1024; + /** Max memory page size. */ + public static final int MAX_PAGE_SIZE = 16 * 1024; + + /** Min memory page size. */ + public static final int MIN_PAGE_SIZE = 1024; + /** This name is assigned to default Dataregion if no user-defined default MemPlc is specified */ public static final String DFLT_DATA_REG_DEFAULT_NAME = "default"; @@ -166,7 +174,8 @@ public class DataStorageConfiguration implements Serializable { private long sysRegionMaxSize = DFLT_SYS_REG_MAX_SIZE; /** Memory page size. */ - private int pageSize; + private int pageSize = IgniteSystemProperties.getInteger( + IGNITE_DEFAULT_DATA_STORAGE_PAGE_SIZE, 0); /** Concurrency level. */ private int concLvl; @@ -346,10 +355,13 @@ public class DataStorageConfiguration implements Serializable { * Changes the page size. * * @param pageSize Page size in bytes. If value is not set (or zero), {@link #DFLT_PAGE_SIZE} will be used. + * @see #MIN_PAGE_SIZE + * @see #MAX_PAGE_SIZE */ public DataStorageConfiguration setPageSize(int pageSize) { if (pageSize != 0) { - A.ensure(pageSize >= 1024 && pageSize <= 16 * 1024, "Page size must be between 1kB and 16kB."); + A.ensure(pageSize >= MIN_PAGE_SIZE && pageSize <= MAX_PAGE_SIZE, + "Page size must be between 1kB and 16kB."); A.ensure(U.isPow2(pageSize), "Page size must be a power of 2."); } http://git-wip-us.apache.org/repos/asf/ignite/blob/e8eeea37/modules/core/src/main/java/org/apache/ignite/configuration/DiskPageCompression.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/configuration/DiskPageCompression.java b/modules/core/src/main/java/org/apache/ignite/configuration/DiskPageCompression.java new file mode 100644 index 0000000..d628c6a --- /dev/null +++ b/modules/core/src/main/java/org/apache/ignite/configuration/DiskPageCompression.java @@ -0,0 +1,38 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ignite.configuration; + +/** + * Disk page compression options. + * + * @see CacheConfiguration#setDiskPageCompression + * @see CacheConfiguration#setDiskPageCompressionLevel + */ +public enum DiskPageCompression { + /** Retain only useful data from half-filled pages, but do not apply any compression. */ + SKIP_GARBAGE, + + /** Zstd compression. */ + ZSTD, + + /** LZ4 compression. */ + LZ4, + + /** Snappy compression. */ + SNAPPY +} http://git-wip-us.apache.org/repos/asf/ignite/blob/e8eeea37/modules/core/src/main/java/org/apache/ignite/internal/GridKernalContext.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/GridKernalContext.java b/modules/core/src/main/java/org/apache/ignite/internal/GridKernalContext.java index a43312c..e19450e 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/GridKernalContext.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/GridKernalContext.java @@ -34,6 +34,7 @@ import org.apache.ignite.internal.managers.failover.GridFailoverManager; import org.apache.ignite.internal.managers.indexing.GridIndexingManager; import org.apache.ignite.internal.managers.loadbalancer.GridLoadBalancerManager; import org.apache.ignite.internal.processors.cache.mvcc.MvccProcessor; +import org.apache.ignite.internal.processors.compress.CompressionProcessor; import org.apache.ignite.internal.worker.WorkersRegistry; import org.apache.ignite.internal.processors.affinity.GridAffinityProcessor; import org.apache.ignite.internal.processors.authentication.IgniteAuthenticationProcessor; @@ -700,6 +701,11 @@ public interface GridKernalContext extends Iterable<GridComponent> { public Thread.UncaughtExceptionHandler uncaughtExceptionHandler(); /** + * @return Compression processor. + */ + public CompressionProcessor compress(); + + /** * @return {@code True} if node is in recovery mode (before join to topology). */ public boolean recoveryMode(); http://git-wip-us.apache.org/repos/asf/ignite/blob/e8eeea37/modules/core/src/main/java/org/apache/ignite/internal/GridKernalContextImpl.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/GridKernalContextImpl.java b/modules/core/src/main/java/org/apache/ignite/internal/GridKernalContextImpl.java index 8a42664..ef69167 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/GridKernalContextImpl.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/GridKernalContextImpl.java @@ -49,6 +49,7 @@ import org.apache.ignite.internal.managers.failover.GridFailoverManager; import org.apache.ignite.internal.managers.indexing.GridIndexingManager; import org.apache.ignite.internal.managers.loadbalancer.GridLoadBalancerManager; import org.apache.ignite.internal.processors.cache.mvcc.MvccProcessor; +import org.apache.ignite.internal.processors.compress.CompressionProcessor; import org.apache.ignite.internal.worker.WorkersRegistry; import org.apache.ignite.internal.processors.affinity.GridAffinityProcessor; import org.apache.ignite.internal.processors.authentication.IgniteAuthenticationProcessor; @@ -294,6 +295,10 @@ public class GridKernalContextImpl implements GridKernalContext, Externalizable /** */ @GridToStringExclude + private CompressionProcessor compressProc; + + /** */ + @GridToStringExclude private DataStructuresProcessor dataStructuresProc; /** Cache mvcc coordinators. */ @@ -639,6 +644,8 @@ public class GridKernalContextImpl implements GridKernalContext, Externalizable internalSubscriptionProc = (GridInternalSubscriptionProcessor)comp; else if (comp instanceof IgniteAuthenticationProcessor) authProc = (IgniteAuthenticationProcessor)comp; + else if (comp instanceof CompressionProcessor) + compressProc = (CompressionProcessor)comp; else if (!(comp instanceof DiscoveryNodeValidationProcessor || comp instanceof PlatformPluginProcessor)) assert (comp instanceof GridPluginComponent) : "Unknown manager class: " + comp.getClass(); @@ -1184,6 +1191,11 @@ public class GridKernalContextImpl implements GridKernalContext, Externalizable } /** {@inheritDoc} */ + @Override public CompressionProcessor compress() { + return compressProc; + } + + /** {@inheritDoc} */ @Override public boolean recoveryMode() { return recoveryMode; } http://git-wip-us.apache.org/repos/asf/ignite/blob/e8eeea37/modules/core/src/main/java/org/apache/ignite/internal/IgniteComponentType.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/IgniteComponentType.java b/modules/core/src/main/java/org/apache/ignite/internal/IgniteComponentType.java index 0cd2fc1..65cbb90 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/IgniteComponentType.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/IgniteComponentType.java @@ -19,6 +19,7 @@ package org.apache.ignite.internal; import java.lang.reflect.Constructor; import org.apache.ignite.IgniteCheckedException; +import org.apache.ignite.internal.processors.compress.CompressionProcessor; import org.apache.ignite.internal.util.typedef.internal.U; import org.apache.ignite.plugin.extensions.communication.MessageFactory; import org.jetbrains.annotations.Nullable; @@ -89,6 +90,12 @@ public enum IgniteComponentType { "org.apache.ignite.internal.processors.schedule.IgniteNoopScheduleProcessor", "org.apache.ignite.internal.processors.schedule.IgniteScheduleProcessor", "ignite-schedule" + ), + + COMPRESSION( + CompressionProcessor.class.getName(), + "org.apache.ignite.internal.processors.compress.CompressionProcessorImpl", + "ignite-compress" ); /** No-op class name. */ http://git-wip-us.apache.org/repos/asf/ignite/blob/e8eeea37/modules/core/src/main/java/org/apache/ignite/internal/IgniteKernal.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/IgniteKernal.java b/modules/core/src/main/java/org/apache/ignite/internal/IgniteKernal.java index 710fd09..284a4cb 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/IgniteKernal.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/IgniteKernal.java @@ -236,6 +236,7 @@ import static org.apache.ignite.internal.GridKernalState.STARTED; import static org.apache.ignite.internal.GridKernalState.STARTING; import static org.apache.ignite.internal.GridKernalState.STOPPED; import static org.apache.ignite.internal.GridKernalState.STOPPING; +import static org.apache.ignite.internal.IgniteComponentType.COMPRESSION; import static org.apache.ignite.internal.IgniteComponentType.HADOOP_HELPER; import static org.apache.ignite.internal.IgniteComponentType.IGFS; import static org.apache.ignite.internal.IgniteComponentType.IGFS_HELPER; @@ -1002,6 +1003,7 @@ public class IgniteKernal implements IgniteEx, IgniteMXBean, Externalizable { // Start processors before discovery manager, so they will // be able to start receiving messages once discovery completes. try { + startProcessor(COMPRESSION.createOptional(ctx)); startProcessor(new PdsConsistentIdProcessor(ctx)); startProcessor(new MvccProcessorImpl(ctx)); startProcessor(createComponent(DiscoveryNodeValidationProcessor.class, ctx)); http://git-wip-us.apache.org/repos/asf/ignite/blob/e8eeea37/modules/core/src/main/java/org/apache/ignite/internal/pagemem/store/PageStore.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/pagemem/store/PageStore.java b/modules/core/src/main/java/org/apache/ignite/internal/pagemem/store/PageStore.java index 7a7f964..7c1e15d 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/pagemem/store/PageStore.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/pagemem/store/PageStore.java @@ -17,9 +17,8 @@ package org.apache.ignite.internal.pagemem.store; -import org.apache.ignite.IgniteCheckedException; - import java.nio.ByteBuffer; +import org.apache.ignite.IgniteCheckedException; import org.apache.ignite.internal.processors.cache.persistence.StorageException; /** @@ -128,4 +127,35 @@ public interface PageStore { * @throws StorageException If failed. */ public void truncate(int tag) throws StorageException; + + /** + * @return Page size in bytes. + */ + public int getPageSize(); + + /** + * @return Storage block size or negative value if unknown or not supported. + */ + public int getBlockSize(); + + /** + * @return Size of the storage in bytes. May differ from {@link #pages()} * {@link #getPageSize()} + * due to delayed writes or due to other implementation specific details. + */ + public long size(); + + /** + * @return Size of the storage adjusted for sparsity in bytes or negative + * value if not supported. Should be less than or equal to {@link #size()}. + * @see #punchHole + */ + public long getSparseSize(); + + /** + * Should free all the extra storage space after the given number of useful bytes in the given page. + * + * @param pageId Page id. + * @param usefulBytes Number of meaningful bytes from the beginning of the page. + */ + void punchHole(long pageId, int usefulBytes); } http://git-wip-us.apache.org/repos/asf/ignite/blob/e8eeea37/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/CacheCompressionManager.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/CacheCompressionManager.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/CacheCompressionManager.java new file mode 100644 index 0000000..d9e977a --- /dev/null +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/CacheCompressionManager.java @@ -0,0 +1,95 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ignite.internal.processors.cache; + +import java.io.File; +import java.nio.ByteBuffer; +import org.apache.ignite.IgniteCheckedException; +import org.apache.ignite.configuration.CacheConfiguration; +import org.apache.ignite.configuration.DataStorageConfiguration; +import org.apache.ignite.configuration.DiskPageCompression; +import org.apache.ignite.internal.pagemem.store.PageStore; +import org.apache.ignite.internal.processors.compress.CompressionProcessor; +import org.apache.ignite.internal.util.typedef.internal.U; + +import static org.apache.ignite.internal.processors.compress.CompressionProcessor.checkCompressionLevelBounds; +import static org.apache.ignite.internal.processors.compress.CompressionProcessor.getDefaultCompressionLevel; + +/** + * Cache compression manager. + */ +public class CacheCompressionManager extends GridCacheManagerAdapter { + /** */ + private DiskPageCompression diskPageCompression; + + /** */ + private int diskPageCompressLevel; + + /** */ + private CompressionProcessor compressProc; + + /** {@inheritDoc} */ + @Override protected void start0() throws IgniteCheckedException { + compressProc = cctx.kernalContext().compress(); + + CacheConfiguration cfg = cctx.config(); + + diskPageCompression = cfg.getDiskPageCompression(); + + if (diskPageCompression != null) { + if (!cctx.dataRegion().config().isPersistenceEnabled()) + throw new IgniteCheckedException("Disk page compression makes sense only with enabled persistence."); + + Integer lvl = cfg.getDiskPageCompressionLevel(); + diskPageCompressLevel = lvl != null ? + checkCompressionLevelBounds(lvl, diskPageCompression) : + getDefaultCompressionLevel(diskPageCompression); + + DataStorageConfiguration dsCfg = cctx.kernalContext().config().getDataStorageConfiguration(); + + File dbPath = cctx.kernalContext().pdsFolderResolver().resolveFolders().persistentStoreRootPath(); + + assert dbPath != null; + + compressProc.checkPageCompressionSupported(dbPath.toPath(), dsCfg.getPageSize()); + + if (log.isInfoEnabled()) { + log.info("Disk page compression is enabled [cache=" + cctx.name() + + ", compression=" + diskPageCompression + ", level=" + diskPageCompressLevel + "]"); + } + } + } + + /** + * @param page Page buffer. + * @param store Page store. + * @return Compressed or the same buffer. + * @throws IgniteCheckedException If failed. + */ + public ByteBuffer compressPage(ByteBuffer page, PageStore store) throws IgniteCheckedException { + if (diskPageCompression == null) + return page; + + int blockSize = store.getBlockSize(); + + if (blockSize <= 0) + throw new IgniteCheckedException("Failed to detect storage block size on " + U.osString()); + + return compressProc.compressPage(page, store.getPageSize(), blockSize, diskPageCompression, diskPageCompressLevel); + } +} http://git-wip-us.apache.org/repos/asf/ignite/blob/e8eeea37/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/CacheGroupMetricsMXBeanImpl.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/CacheGroupMetricsMXBeanImpl.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/CacheGroupMetricsMXBeanImpl.java index 5ece77f..59894e3 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/CacheGroupMetricsMXBeanImpl.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/CacheGroupMetricsMXBeanImpl.java @@ -28,15 +28,17 @@ import java.util.UUID; import java.util.concurrent.atomic.LongAdder; import org.apache.ignite.cache.CacheMode; import org.apache.ignite.cluster.ClusterNode; +import org.apache.ignite.internal.pagemem.store.PageStore; import org.apache.ignite.internal.processors.affinity.AffinityAssignment; import org.apache.ignite.internal.processors.affinity.AffinityTopologyVersion; -import org.apache.ignite.internal.processors.cache.distributed.dht.topology.GridDhtLocalPartition; -import org.apache.ignite.internal.processors.cache.distributed.dht.topology.GridDhtPartitionState; import org.apache.ignite.internal.processors.cache.distributed.dht.preloader.GridDhtPartitionFullMap; import org.apache.ignite.internal.processors.cache.distributed.dht.preloader.GridDhtPartitionMap; +import org.apache.ignite.internal.processors.cache.distributed.dht.topology.GridDhtLocalPartition; +import org.apache.ignite.internal.processors.cache.distributed.dht.topology.GridDhtPartitionState; import org.apache.ignite.internal.processors.cache.persistence.AllocatedPageTracker; import org.apache.ignite.internal.processors.cache.persistence.DataRegion; import org.apache.ignite.internal.processors.cache.persistence.DataRegionMetricsImpl; +import org.apache.ignite.internal.processors.cache.persistence.GridCacheDatabaseSharedManager; import org.apache.ignite.mxbean.CacheGroupMetricsMXBean; /** @@ -359,4 +361,21 @@ public class CacheGroupMetricsMXBeanImpl implements CacheGroupMetricsMXBean { @Override public long getTotalAllocatedSize() { return getTotalAllocatedPages() * ctx.dataRegion().pageMemory().pageSize(); } + + /** {@inheritDoc} */ + @Override public long getStorageSize() { + return database().forGroupPageStores(ctx, PageStore::size); + } + + /** {@inheritDoc} */ + @Override public long getSparseStorageSize() { + return database().forGroupPageStores(ctx, PageStore::getSparseSize); + } + + /** + * @return Database. + */ + private GridCacheDatabaseSharedManager database() { + return (GridCacheDatabaseSharedManager)ctx.shared().database(); + } } http://git-wip-us.apache.org/repos/asf/ignite/blob/e8eeea37/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheContext.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheContext.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheContext.java index 30cf969..1a8cf88 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheContext.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheContext.java @@ -177,6 +177,9 @@ public class GridCacheContext<K, V> implements Externalizable { /** Store manager. */ private CacheStoreManager storeMgr; + /** Compression manager. */ + private CacheCompressionManager compressMgr; + /** Replication manager. */ private GridCacheDrManager drMgr; @@ -321,6 +324,7 @@ public class GridCacheContext<K, V> implements Externalizable { * =========================== */ + CacheCompressionManager compressMgr, GridCacheEventManager evtMgr, CacheStoreManager storeMgr, CacheEvictionManager evictMgr, @@ -338,6 +342,7 @@ public class GridCacheContext<K, V> implements Externalizable { assert cacheCfg != null; assert locStartTopVer != null : cacheCfg.getName(); + assert compressMgr != null; assert grp != null; assert evtMgr != null; assert storeMgr != null; @@ -364,6 +369,7 @@ public class GridCacheContext<K, V> implements Externalizable { * Managers in starting order! * =========================== */ + this.compressMgr = add(compressMgr); this.evtMgr = add(evtMgr); this.storeMgr = add(storeMgr); this.evictMgr = add(evictMgr); @@ -1230,6 +1236,13 @@ public class GridCacheContext<K, V> implements Externalizable { } /** + * @return Compression manager. + */ + public CacheCompressionManager compress() { + return compressMgr; + } + + /** * Sets cache object context. * * @param cacheObjCtx Cache object context. http://git-wip-us.apache.org/repos/asf/ignite/blob/e8eeea37/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheProcessor.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheProcessor.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheProcessor.java index ce81468..8a54852 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheProcessor.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheProcessor.java @@ -1524,6 +1524,7 @@ public class GridCacheProcessor extends GridProcessorAdapter { boolean nearEnabled = GridCacheUtils.isNearEnabled(cfg); + CacheCompressionManager compressMgr = new CacheCompressionManager(); GridCacheAffinityManager affMgr = new GridCacheAffinityManager(); GridCacheEventManager evtMgr = new GridCacheEventManager(); CacheEvictionManager evictMgr = (nearEnabled || cfg.isOnheapCacheEnabled()) ? new GridCacheEvictionManager() : new CacheOffheapEvictionManager(); @@ -1558,6 +1559,7 @@ public class GridCacheProcessor extends GridProcessorAdapter { * Managers in starting order! * =========================== */ + compressMgr, evtMgr, storeMgr, evictMgr, @@ -1694,6 +1696,7 @@ public class GridCacheProcessor extends GridProcessorAdapter { * Managers in starting order! * =========================== */ + compressMgr, evtMgr, storeMgr, evictMgr, http://git-wip-us.apache.org/repos/asf/ignite/blob/e8eeea37/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/DataStorageMetricsImpl.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/DataStorageMetricsImpl.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/DataStorageMetricsImpl.java index 03955a4..4565b58 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/DataStorageMetricsImpl.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/DataStorageMetricsImpl.java @@ -93,6 +93,12 @@ public class DataStorageMetricsImpl implements DataStorageMetricsMXBean { /** */ private volatile Collection<DataRegionMetrics> regionMetrics; + /** */ + private volatile long storageSize; + + /** */ + private volatile long sparseStorageSize; + /** * @param metricsEnabled Metrics enabled flag. * @param rateTimeInterval Rate time interval. @@ -485,6 +491,16 @@ public class DataStorageMetricsImpl implements DataStorageMetricsMXBean { return metricsEnabled; } + /** {@inheritDoc} */ + @Override public long getStorageSize() { + return storageSize; + } + + /** {@inheritDoc} */ + @Override public long getSparseStorageSize() { + return sparseStorageSize; + } + /** * @param lockWaitDuration Lock wait duration. * @param markDuration Mark duration. @@ -503,7 +519,9 @@ public class DataStorageMetricsImpl implements DataStorageMetricsMXBean { long duration, long totalPages, long dataPages, - long cowPages + long cowPages, + long storageSize, + long sparseStorageSize ) { if (metricsEnabled) { lastCpLockWaitDuration = lockWaitDuration; @@ -514,6 +532,8 @@ public class DataStorageMetricsImpl implements DataStorageMetricsMXBean { lastCpTotalPages = totalPages; lastCpDataPages = dataPages; lastCpCowPages = cowPages; + this.storageSize = storageSize; + this.sparseStorageSize = sparseStorageSize; totalCheckpointTime.addAndGet(duration); } http://git-wip-us.apache.org/repos/asf/ignite/blob/e8eeea37/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/DataStorageMetricsSnapshot.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/DataStorageMetricsSnapshot.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/DataStorageMetricsSnapshot.java index c3bcd5b..78b08bc 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/DataStorageMetricsSnapshot.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/DataStorageMetricsSnapshot.java @@ -101,6 +101,12 @@ public class DataStorageMetricsSnapshot implements DataStorageMetrics { /** */ private long totalAllocatedSize; + /** */ + private long storageSize; + + /** */ + private long sparseStorageSize; + /** * @param metrics Metrics. */ @@ -131,6 +137,8 @@ public class DataStorageMetricsSnapshot implements DataStorageMetrics { offHeapSize = metrics.getOffHeapSize(); offHeadUsedSize = metrics.getOffheapUsedSize(); totalAllocatedSize = metrics.getTotalAllocatedSize(); + storageSize = metrics.getStorageSize(); + sparseStorageSize = metrics.getSparseStorageSize(); } /** {@inheritDoc} */ @@ -264,6 +272,16 @@ public class DataStorageMetricsSnapshot implements DataStorageMetrics { } /** {@inheritDoc} */ + @Override public long getStorageSize() { + return storageSize; + } + + /** {@inheritDoc} */ + @Override public long getSparseStorageSize() { + return sparseStorageSize; + } + + /** {@inheritDoc} */ @Override public String toString() { return S.toString(DataStorageMetricsSnapshot.class, this); } http://git-wip-us.apache.org/repos/asf/ignite/blob/e8eeea37/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/GridCacheDatabaseSharedManager.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/GridCacheDatabaseSharedManager.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/GridCacheDatabaseSharedManager.java index ed54f65..9a083f8 100755 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/GridCacheDatabaseSharedManager.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/GridCacheDatabaseSharedManager.java @@ -54,6 +54,7 @@ import java.util.concurrent.atomic.AtomicInteger; import java.util.concurrent.atomic.LongAdder; import java.util.concurrent.locks.ReentrantReadWriteLock; import java.util.function.Predicate; +import java.util.function.ToLongFunction; import java.util.regex.Matcher; import java.util.regex.Pattern; import java.util.stream.Collectors; @@ -398,6 +399,13 @@ public class GridCacheDatabaseSharedManager extends IgniteCacheDatabaseSharedMan : ctx.config().getFailureDetectionTimeout())); } + /** + * @return File store manager. + */ + public FilePageStoreManager getFileStoreManager() { + return storeMgr; + } + /** */ private void notifyMetastorageReadyForRead() throws IgniteCheckedException { for (MetastorageLifecycleListener lsnr : metastorageLifecycleLsnrs) @@ -1935,6 +1943,44 @@ public class GridCacheDatabaseSharedManager extends IgniteCacheDatabaseSharedMan } /** + * @param f Consumer. + * @return Accumulated result for all page stores. + */ + public long forAllPageStores(ToLongFunction<PageStore> f) { + long res = 0; + + for (CacheGroupContext gctx : cctx.cache().cacheGroups()) + res += forGroupPageStores(gctx, f); + + return res; + } + + /** + * @param gctx Group context. + * @param f Consumer. + * @return Accumulated result for all page stores. + */ + public long forGroupPageStores(CacheGroupContext gctx, ToLongFunction<PageStore> f) { + int groupId = gctx.groupId(); + + long res = 0; + + try { + Collection<PageStore> stores = storeMgr.getStores(groupId); + + if (stores != null) { + for (PageStore store : stores) + res += f.applyAsLong(store); + } + } + catch (IgniteCheckedException e) { + throw new IgniteException(e); + } + + return res; + } + + /** * Calculates tail pointer for WAL at the end of logical recovery. * * @param from Start replay WAL from. @@ -3263,28 +3309,9 @@ public class GridCacheDatabaseSharedManager extends IgniteCacheDatabaseSharedMan tracker.totalDuration())); } } - - persStoreMetrics.onCheckpoint( - tracker.lockWaitDuration(), - tracker.markDuration(), - tracker.pagesWriteDuration(), - tracker.fsyncDuration(), - tracker.totalDuration(), - chp.pagesSize, - tracker.dataPagesWritten(), - tracker.cowPagesWritten()); - } - else { - persStoreMetrics.onCheckpoint( - tracker.lockWaitDuration(), - tracker.markDuration(), - tracker.pagesWriteDuration(), - tracker.fsyncDuration(), - tracker.totalDuration(), - chp.pagesSize, - tracker.dataPagesWritten(), - tracker.cowPagesWritten()); } + + updateMetrics(chp, tracker); } catch (IgniteCheckedException e) { if (chp != null) @@ -3294,6 +3321,26 @@ public class GridCacheDatabaseSharedManager extends IgniteCacheDatabaseSharedMan } } + /** + * @param chp Checkpoint. + * @param tracker Tracker. + */ + private void updateMetrics(Checkpoint chp, CheckpointMetricsTracker tracker) { + if (persStoreMetrics.metricsEnabled()) { + persStoreMetrics.onCheckpoint( + tracker.lockWaitDuration(), + tracker.markDuration(), + tracker.pagesWriteDuration(), + tracker.fsyncDuration(), + tracker.totalDuration(), + chp.pagesSize, + tracker.dataPagesWritten(), + tracker.cowPagesWritten(), + forAllPageStores(PageStore::size), + forAllPageStores(PageStore::getSparseSize)); + } + } + /** */ private String prepareWalSegsCoveredMsg(IgniteBiTuple<Long, Long> walRange) { String res; http://git-wip-us.apache.org/repos/asf/ignite/blob/e8eeea37/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/file/AbstractFileIO.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/file/AbstractFileIO.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/file/AbstractFileIO.java index d0211f4..d0aaef7 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/file/AbstractFileIO.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/file/AbstractFileIO.java @@ -56,7 +56,7 @@ public abstract class AbstractFileIO implements FileIO { i += n; time = 0; } - else if (n == 0) { + else if (n == 0 || i > 0) { if (!write && available(num - i, position + i) == 0) return i; http://git-wip-us.apache.org/repos/asf/ignite/blob/e8eeea37/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/file/AsyncFileIO.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/file/AsyncFileIO.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/file/AsyncFileIO.java index fd00e25..7c6ece8 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/file/AsyncFileIO.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/file/AsyncFileIO.java @@ -18,6 +18,7 @@ package org.apache.ignite.internal.processors.cache.persistence.file; import java.io.File; +import java.io.FileDescriptor; import java.io.IOException; import java.nio.ByteBuffer; import java.nio.MappedByteBuffer; @@ -25,8 +26,10 @@ import java.nio.channels.AsynchronousFileChannel; import java.nio.channels.CompletionHandler; import java.nio.file.OpenOption; import org.apache.ignite.IgniteCheckedException; +import org.apache.ignite.internal.processors.compress.FileSystemUtils; import org.apache.ignite.internal.util.GridConcurrentHashSet; import org.apache.ignite.internal.util.future.GridFutureAdapter; +import org.apache.ignite.internal.util.typedef.internal.U; /** * File I/O implementation based on {@link AsynchronousFileChannel}. @@ -37,6 +40,12 @@ public class AsyncFileIO extends AbstractFileIO { */ private final AsynchronousFileChannel ch; + /** Native file descriptor. */ + private final int fd; + + /** */ + private final int fsBlockSize; + /** * Channel's position. */ @@ -54,11 +63,36 @@ public class AsyncFileIO extends AbstractFileIO { * @param modes Open modes. */ public AsyncFileIO(File file, ThreadLocal<ChannelOpFuture> holder, OpenOption... modes) throws IOException { - this.ch = AsynchronousFileChannel.open(file.toPath(), modes); - + ch = AsynchronousFileChannel.open(file.toPath(), modes); + fd = getFileDescriptor(ch); + fsBlockSize = FileSystemUtils.getFileSystemBlockSize(fd); this.holder = holder; } + /** + * @param ch File channel. + * @return Native file descriptor. + */ + private static int getFileDescriptor(AsynchronousFileChannel ch) { + FileDescriptor fd = U.field(ch, "fdObj"); + return U.field(fd, "fd"); + } + + /** {@inheritDoc} */ + @Override public int getFileSystemBlockSize() { + return fsBlockSize; + } + + /** {@inheritDoc} */ + @Override public long getSparseSize() { + return FileSystemUtils.getSparseFileSize(fd); + } + + /** {@inheritDoc} */ + @Override public int punchHole(long position, int len) { + return (int)FileSystemUtils.punchHole(fd, position, len, fsBlockSize); + } + /** {@inheritDoc} */ @Override public long position() throws IOException { return position; http://git-wip-us.apache.org/repos/asf/ignite/blob/e8eeea37/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/file/EncryptedFileIO.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/file/EncryptedFileIO.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/file/EncryptedFileIO.java index 86d9bbc..f21b8ee 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/file/EncryptedFileIO.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/file/EncryptedFileIO.java @@ -21,7 +21,6 @@ import java.io.IOException; import java.io.Serializable; import java.nio.ByteBuffer; import java.nio.MappedByteBuffer; - import org.apache.ignite.internal.processors.cache.persistence.wal.crc.FastCrc; import org.apache.ignite.spi.encryption.EncryptionSpi; import org.apache.ignite.internal.managers.encryption.GridEncryptionManager; @@ -100,6 +99,21 @@ public class EncryptedFileIO implements FileIO { } /** {@inheritDoc} */ + @Override public int getFileSystemBlockSize() { + return -1; + } + + /** {@inheritDoc} */ + @Override public long getSparseSize() { + return -1; + } + + /** {@inheritDoc} */ + @Override public int punchHole(long position, int len) { + throw new UnsupportedOperationException(); + } + + /** {@inheritDoc} */ @Override public long position() throws IOException { return plainFileIO.position(); } http://git-wip-us.apache.org/repos/asf/ignite/blob/e8eeea37/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/file/FileIO.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/file/FileIO.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/file/FileIO.java index 6f32d01..546d1a7 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/file/FileIO.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/file/FileIO.java @@ -253,4 +253,23 @@ public interface FileIO extends AutoCloseable { * @throws IOException If some I/O error occurs. */ @Override public void close() throws IOException; + + /** + * @return File system block size or negative value if unknown. + */ + public int getFileSystemBlockSize(); + + /** + * @param position Starting file position. + * @param len Number of bytes to free. + * @return The actual freed size or negative value if not supported. + */ + int punchHole(long position, int len); + + /** + * @return Approximate system dependent size of the storage or negative + * value if not supported. + * @see #punchHole + */ + long getSparseSize(); } http://git-wip-us.apache.org/repos/asf/ignite/blob/e8eeea37/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/file/FileIODecorator.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/file/FileIODecorator.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/file/FileIODecorator.java index 8e79b54..c615a34 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/file/FileIODecorator.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/file/FileIODecorator.java @@ -26,7 +26,7 @@ import java.nio.MappedByteBuffer; */ public class FileIODecorator extends AbstractFileIO { /** File I/O delegate */ - private final FileIO delegate; + protected final FileIO delegate; /** * @@ -37,6 +37,21 @@ public class FileIODecorator extends AbstractFileIO { } /** {@inheritDoc} */ + @Override public int getFileSystemBlockSize() { + return delegate.getFileSystemBlockSize(); + } + + /** {@inheritDoc} */ + @Override public long getSparseSize() { + return delegate.getSparseSize(); + } + + /** {@inheritDoc} */ + @Override public int punchHole(long pos, int len) { + return delegate.punchHole(pos, len); + } + + /** {@inheritDoc} */ @Override public long position() throws IOException { return delegate.position(); } http://git-wip-us.apache.org/repos/asf/ignite/blob/e8eeea37/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/file/FilePageStore.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/file/FilePageStore.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/file/FilePageStore.java index 16d74c3..a8fae08 100755 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/file/FilePageStore.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/file/FilePageStore.java @@ -27,8 +27,8 @@ import java.nio.file.Files; import java.util.concurrent.atomic.AtomicLong; import java.util.concurrent.locks.ReadWriteLock; import java.util.concurrent.locks.ReentrantReadWriteLock; - import org.apache.ignite.IgniteCheckedException; +import org.apache.ignite.IgniteException; import org.apache.ignite.IgniteSystemProperties; import org.apache.ignite.configuration.DataStorageConfiguration; import org.apache.ignite.internal.pagemem.PageIdUtils; @@ -71,7 +71,7 @@ public class FilePageStore implements PageStore { private final FileIOFactory ioFactory; /** I/O interface for read/write operations with file */ - private volatile FileIO fileIO; + protected volatile FileIO fileIO; /** */ private final AtomicLong allocated; @@ -80,7 +80,7 @@ public class FilePageStore implements PageStore { private final AllocatedPageTracker allocatedTracker; /** */ - private final int pageSize; + protected final int pageSize; /** */ private volatile boolean inited; @@ -105,7 +105,8 @@ public class FilePageStore implements PageStore { File file, FileIOFactory factory, DataStorageConfiguration cfg, - AllocatedPageTracker allocatedTracker) { + AllocatedPageTracker allocatedTracker + ) { this.type = type; this.cfgFile = file; this.dbCfg = cfg; @@ -116,6 +117,38 @@ public class FilePageStore implements PageStore { } /** {@inheritDoc} */ + @Override public int getPageSize() { + return pageSize; + } + + /** {@inheritDoc} */ + @Override public int getBlockSize() { + return -1; // Header is unaligned in this version. + } + + /** {@inheritDoc} */ + @Override public long size() { + try { + FileIO io = fileIO; + + return io == null ? 0 : io.size(); + } + catch (IOException e) { + throw new IgniteException(e); + } + } + + /** {@inheritDoc} */ + @Override public long getSparseSize() { + return -1; + } + + /** {@inheritDoc} */ + @Override public void punchHole(long pageId, int usefulBytes) { + throw new UnsupportedOperationException(); + } + + /** {@inheritDoc} */ @Override public boolean exists() { return cfgFile.exists() && cfgFile.length() > headerSize(); } @@ -228,10 +261,8 @@ public class FilePageStore implements PageStore { if (fileSize == headerSize()) // Every file has a special meta page. fileSize = pageSize + headerSize(); - if ((fileSize - headerSize()) % pageSize != 0) - throw new IOException(prefix + "(invalid file size)" + - " [fileSize=" + U.hexLong(fileSize) + - ", pageSize=" + U.hexLong(pageSize) + ']'); + if (fileSize % pageSize != 0) // In the case of compressed pages we can miss the tail of the page. + fileSize = (fileSize / pageSize + 1) * pageSize; return fileSize; } @@ -333,6 +364,26 @@ public class FilePageStore implements PageStore { } } + /** + * @param pageId Page ID. + * @param pageBuf Page buffer. + * @return Number of bytes to calculate CRC on. + */ + private int getCrcSize(long pageId, ByteBuffer pageBuf) throws IOException { + int compressedSize = PageIO.getCompressedSize(pageBuf); + + if (compressedSize == 0) + return pageSize; // Page is not compressed. + + if (compressedSize < 0 || compressedSize > pageSize) { + throw new IgniteDataIntegrityViolationException("Failed to read page (CRC validation failed) " + + "[id=" + U.hexLong(pageId) + ", file=" + cfgFile.getAbsolutePath() + ", fileSize=" + fileIO.size() + + ", page=" + U.toHexString(pageBuf) + "]"); + } + + return compressedSize; + } + /** {@inheritDoc} */ @Override public void read(long pageId, ByteBuffer pageBuf, boolean keepCrc) throws IgniteCheckedException { init(); @@ -363,7 +414,7 @@ public class FilePageStore implements PageStore { pageBuf.position(0); if (!skipCrc) { - int curCrc32 = FastCrc.calcCrc(pageBuf, pageSize); + int curCrc32 = FastCrc.calcCrc(pageBuf, getCrcSize(pageId, pageBuf)); if ((savedCrc32 ^ curCrc32) != 0) throw new IgniteDataIntegrityViolationException("Failed to read page (CRC validation failed) " + @@ -549,7 +600,6 @@ public class FilePageStore implements PageStore { "off=" + U.hexLong(off) + ", allocated=" + U.hexLong(allocated.get()) + ", pageId=" + U.hexLong(pageId) + ", file=" + cfgFile.getPath(); - assert pageBuf.capacity() == pageSize; assert pageBuf.position() == 0; assert pageBuf.order() == ByteOrder.nativeOrder() : "Page buffer order " + pageBuf.order() + " should be same with " + ByteOrder.nativeOrder(); @@ -559,7 +609,7 @@ public class FilePageStore implements PageStore { if (calculateCrc && !skipCrc) { assert PageIO.getCrc(pageBuf) == 0 : U.hexLong(pageId); - PageIO.setCrc(pageBuf, calcCrc32(pageBuf, pageSize)); + PageIO.setCrc(pageBuf, calcCrc32(pageBuf, getCrcSize(pageId, pageBuf))); } // Check whether crc was calculated somewhere above the stack if it is forcibly skipped. http://git-wip-us.apache.org/repos/asf/ignite/blob/e8eeea37/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/file/FilePageStoreManager.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/file/FilePageStoreManager.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/file/FilePageStoreManager.java index 4a14c6b..86560ba 100755 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/file/FilePageStoreManager.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/file/FilePageStoreManager.java @@ -30,6 +30,7 @@ import java.nio.file.DirectoryStream; import java.nio.file.Files; import java.nio.file.Path; import java.nio.file.StandardCopyOption; +import java.util.AbstractList; import java.util.Arrays; import java.util.Collection; import java.util.Collections; @@ -54,6 +55,7 @@ import org.apache.ignite.internal.pagemem.store.IgnitePageStoreManager; import org.apache.ignite.internal.pagemem.store.PageStore; import org.apache.ignite.internal.processors.cache.CacheGroupContext; import org.apache.ignite.internal.processors.cache.CacheGroupDescriptor; +import org.apache.ignite.internal.processors.cache.GridCacheContext; import org.apache.ignite.internal.processors.cache.GridCacheSharedManagerAdapter; import org.apache.ignite.internal.processors.cache.StoredCacheData; import org.apache.ignite.internal.processors.cache.persistence.AllocatedPageTracker; @@ -62,6 +64,7 @@ import org.apache.ignite.internal.processors.cache.persistence.StorageException; import org.apache.ignite.internal.processors.cache.persistence.filename.PdsFolderSettings; import org.apache.ignite.internal.processors.cache.persistence.metastorage.MetaStorage; import org.apache.ignite.internal.processors.cache.persistence.snapshot.IgniteCacheSnapshotManager; +import org.apache.ignite.internal.processors.cache.persistence.tree.io.PageIO; import org.apache.ignite.internal.util.IgniteUtils; import org.apache.ignite.internal.util.typedef.X; import org.apache.ignite.internal.util.typedef.internal.CU; @@ -73,6 +76,7 @@ import org.jetbrains.annotations.Nullable; import static java.nio.file.Files.delete; import static java.nio.file.Files.newDirectoryStream; +import static java.util.Objects.requireNonNull; /** * File page store manager. @@ -461,6 +465,10 @@ public class FilePageStoreManager extends GridCacheSharedManagerAdapter implemen try { store.read(pageId, pageBuf, keepCrc); + + assert keepCrc || PageIO.getCrc(pageBuf) == 0: store.size() - store.pageOffset(pageId); + + cctx.kernalContext().compress().decompressPage(pageBuf, store.getPageSize()); } catch (StorageException e) { cctx.kernalContext().failure().process(new FailureContext(FailureType.CRITICAL_ERROR, e)); @@ -511,13 +519,40 @@ public class FilePageStoreManager extends GridCacheSharedManagerAdapter implemen * @return PageStore to which the page has been written. * @throws IgniteCheckedException If IO error occurred. */ - public PageStore writeInternal(int cacheId, long pageId, ByteBuffer pageBuf, int tag, boolean calculateCrc) throws IgniteCheckedException { + public PageStore writeInternal(int cacheId, long pageId, ByteBuffer pageBuf, int tag, boolean calculateCrc) + throws IgniteCheckedException { int partId = PageIdUtils.partId(pageId); PageStore store = getStore(cacheId, partId); try { + int pageSize = store.getPageSize(); + int compressedPageSize = pageSize; + + GridCacheContext cctx0 = cctx.cacheContext(cacheId); + + if (cctx0 != null) { + assert pageBuf.position() == 0 && pageBuf.limit() == pageSize: pageBuf; + + ByteBuffer compressedPageBuf = cctx0.compress().compressPage(pageBuf, store); + + if (compressedPageBuf != pageBuf) { + compressedPageSize = PageIO.getCompressedSize(compressedPageBuf); + + if (!calculateCrc) { + calculateCrc = true; + PageIO.setCrc(compressedPageBuf, 0); // It will be recalculated over compressed data further. + } + + PageIO.setCrc(pageBuf, 0); // It is expected to be reset to 0 after each write. + pageBuf = compressedPageBuf; + } + } + store.write(pageId, pageBuf, tag, calculateCrc); + + if (pageSize > compressedPageSize) + store.punchHole(pageId, compressedPageSize); // TODO maybe add async punch mode? } catch (StorageException e) { cctx.kernalContext().failure().process(new FailureContext(FailureType.CRITICAL_ERROR, e)); @@ -1048,6 +1083,15 @@ public class FilePageStoreManager extends GridCacheSharedManagerAdapter implemen /** * @param grpId Cache group ID. + * @return Collection of related page stores. + * @throws IgniteCheckedException If failed. + */ + public Collection<PageStore> getStores(int grpId) throws IgniteCheckedException { + return getHolder(grpId); + } + + /** + * @param grpId Cache group ID. * @param partId Partition ID. * @return Page store for the corresponding parameters. * @throws IgniteCheckedException If cache or partition with the given ID was not created. @@ -1125,7 +1169,7 @@ public class FilePageStoreManager extends GridCacheSharedManagerAdapter implemen /** * */ - private static class CacheStoreHolder { + private static class CacheStoreHolder extends AbstractList<PageStore> { /** Index store. */ private final PageStore idxStore; @@ -1133,11 +1177,20 @@ public class FilePageStoreManager extends GridCacheSharedManagerAdapter implemen private final PageStore[] partStores; /** - * */ - public CacheStoreHolder(PageStore idxStore, PageStore[] partStores) { - this.idxStore = idxStore; - this.partStores = partStores; + CacheStoreHolder(PageStore idxStore, PageStore[] partStores) { + this.idxStore = requireNonNull(idxStore); + this.partStores = requireNonNull(partStores); + } + + /** {@inheritDoc} */ + @Override public PageStore get(int idx) { + return requireNonNull(idx == partStores.length ? idxStore : partStores[idx]); + } + + /** {@inheritDoc} */ + @Override public int size() { + return partStores.length + 1; } } } http://git-wip-us.apache.org/repos/asf/ignite/blob/e8eeea37/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/file/FilePageStoreV2.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/file/FilePageStoreV2.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/file/FilePageStoreV2.java index d8c800d..de078eb 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/file/FilePageStoreV2.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/file/FilePageStoreV2.java @@ -57,4 +57,25 @@ public class FilePageStoreV2 extends FilePageStore { @Override public int version() { return VERSION; } + + /** {@inheritDoc} */ + @Override public int getBlockSize() { + return fileIO.getFileSystemBlockSize(); + } + + /** {@inheritDoc} */ + @Override public long getSparseSize() { + FileIO io = fileIO; + + return io == null ? 0 : fileIO.getSparseSize(); + } + + /** {@inheritDoc} */ + @Override public void punchHole(long pageId, int usefulBytes) { + assert usefulBytes >= 0 && usefulBytes < pageSize: usefulBytes; + + long off = pageOffset(pageId); + + fileIO.punchHole(off + usefulBytes, pageSize - usefulBytes); + } } http://git-wip-us.apache.org/repos/asf/ignite/blob/e8eeea37/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/file/RandomAccessFileIO.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/file/RandomAccessFileIO.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/file/RandomAccessFileIO.java index ef4a3df..c6922bc 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/file/RandomAccessFileIO.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/file/RandomAccessFileIO.java @@ -18,11 +18,14 @@ package org.apache.ignite.internal.processors.cache.persistence.file; import java.io.File; +import java.io.FileDescriptor; import java.io.IOException; import java.nio.ByteBuffer; import java.nio.MappedByteBuffer; import java.nio.channels.FileChannel; import java.nio.file.OpenOption; +import org.apache.ignite.internal.processors.compress.FileSystemUtils; +import org.apache.ignite.internal.util.typedef.internal.U; /** * File I/O implementation based on {@link FileChannel}. @@ -33,6 +36,12 @@ public class RandomAccessFileIO extends AbstractFileIO { */ private final FileChannel ch; + /** Native file descriptor. */ + private final int fd; + + /** */ + private final int fsBlockSize; + /** * Creates I/O implementation for specified {@code file} * @@ -41,6 +50,32 @@ public class RandomAccessFileIO extends AbstractFileIO { */ public RandomAccessFileIO(File file, OpenOption... modes) throws IOException { ch = FileChannel.open(file.toPath(), modes); + fd = getNativeFileDescriptor(ch); + fsBlockSize = FileSystemUtils.getFileSystemBlockSize(fd); + } + + /** + * @param ch File channel. + * @return Native file descriptor. + */ + private static int getNativeFileDescriptor(FileChannel ch) { + FileDescriptor fd = U.field(ch, "fd"); + return U.field(fd, "fd"); + } + + /** {@inheritDoc} */ + @Override public int getFileSystemBlockSize() { + return fsBlockSize; + } + + /** {@inheritDoc} */ + @Override public long getSparseSize() { + return FileSystemUtils.getSparseFileSize(fd); + } + + /** {@inheritDoc} */ + @Override public int punchHole(long position, int len) { + return (int)FileSystemUtils.punchHole(fd, position, len, fsBlockSize); } /** {@inheritDoc} */ http://git-wip-us.apache.org/repos/asf/ignite/blob/e8eeea37/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/file/UnzipFileIO.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/file/UnzipFileIO.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/file/UnzipFileIO.java index 6345b1f..5300d83 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/file/UnzipFileIO.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/file/UnzipFileIO.java @@ -54,6 +54,21 @@ public class UnzipFileIO extends AbstractFileIO { } /** {@inheritDoc} */ + @Override public int getFileSystemBlockSize() { + return -1; + } + + /** {@inheritDoc} */ + @Override public long getSparseSize() { + return -1; + } + + /** {@inheritDoc} */ + @Override public int punchHole(long position, int len) { + throw new UnsupportedOperationException(); + } + + /** {@inheritDoc} */ @Override public long position() throws IOException { return totalBytesRead; } http://git-wip-us.apache.org/repos/asf/ignite/blob/e8eeea37/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/pagemem/PageMemoryImpl.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/pagemem/PageMemoryImpl.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/pagemem/PageMemoryImpl.java index b64b294..03f66c0 100755 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/pagemem/PageMemoryImpl.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/pagemem/PageMemoryImpl.java @@ -530,7 +530,7 @@ public class PageMemoryImpl implements PageMemoryEx { PageHeader.writeTimestamp(absPtr, U.currentTimeMillis()); rwLock.init(absPtr + PAGE_LOCK_OFFSET, PageIdUtils.tag(pageId)); - assert GridUnsafe.getInt(absPtr + PAGE_OVERHEAD + 4) == 0; //TODO GG-11480 + assert PageIO.getCrc(absPtr + PAGE_OVERHEAD) == 0; //TODO GG-11480 assert !PageHeader.isAcquired(absPtr) : "Pin counter must be 0 for a new page [relPtr=" + U.hexLong(relPtr) + @@ -808,9 +808,9 @@ public class PageMemoryImpl implements PageMemoryEx { memMetrics.onPageRead(); } - catch (IgniteDataIntegrityViolationException ignore) { + catch (IgniteDataIntegrityViolationException e) { U.warn(log, "Failed to read page (data integrity violation encountered, will try to " + - "restore using existing WAL) [fullPageId=" + fullId + ']'); + "restore using existing WAL) [fullPageId=" + fullId + ']', e); buf.rewind(); @@ -1252,8 +1252,8 @@ public class PageMemoryImpl implements PageMemoryEx { GridUnsafe.copyMemory(absPtr + PAGE_OVERHEAD, tmpPtr, pageSize()); - assert GridUnsafe.getInt(absPtr + PAGE_OVERHEAD + 4) == 0; //TODO GG-11480 - assert GridUnsafe.getInt(tmpPtr + 4) == 0; //TODO GG-11480 + assert PageIO.getCrc(absPtr + PAGE_OVERHEAD) == 0; //TODO GG-11480 + assert PageIO.getCrc(tmpPtr) == 0; //TODO GG-11480 } else { byte[] arr = buf.array(); @@ -1402,7 +1402,7 @@ public class PageMemoryImpl implements PageMemoryEx { if (touch) PageHeader.writeTimestamp(absPtr, U.currentTimeMillis()); - assert GridUnsafe.getInt(absPtr + PAGE_OVERHEAD + 4) == 0; //TODO GG-11480 + assert PageIO.getCrc(absPtr + PAGE_OVERHEAD) == 0; //TODO GG-11480 return absPtr + PAGE_OVERHEAD; } @@ -1490,11 +1490,11 @@ public class PageMemoryImpl implements PageMemoryEx { PageHeader.dirty(absPtr, false); PageHeader.tempBufferPointer(absPtr, tmpRelPtr); - assert GridUnsafe.getInt(absPtr + PAGE_OVERHEAD + 4) == 0; //TODO GG-11480 - assert GridUnsafe.getInt(tmpAbsPtr + PAGE_OVERHEAD + 4) == 0; //TODO GG-11480 + assert PageIO.getCrc(absPtr + PAGE_OVERHEAD) == 0; //TODO GG-11480 + assert PageIO.getCrc(tmpAbsPtr + PAGE_OVERHEAD) == 0; //TODO GG-11480 } - assert GridUnsafe.getInt(absPtr + PAGE_OVERHEAD + 4) == 0; //TODO GG-11480 + assert PageIO.getCrc(absPtr + PAGE_OVERHEAD) == 0; //TODO GG-11480 return absPtr + PAGE_OVERHEAD; } @@ -1522,7 +1522,7 @@ public class PageMemoryImpl implements PageMemoryEx { boolean pageWalRec = markDirty && walPlc != FALSE && (walPlc == TRUE || !wasDirty); - assert GridUnsafe.getInt(page + PAGE_OVERHEAD + 4) == 0; //TODO GG-11480 + assert PageIO.getCrc(page + PAGE_OVERHEAD) == 0; //TODO GG-11480 if (markDirty) setDirty(fullId, page, markDirty, false);