This is an automated email from the ASF dual-hosted git repository. sergeychugunov pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/ignite.git
The following commit(s) were added to refs/heads/master by this push: new 90aeb56 IGNITE-14170 New metrics for number of bytes written to WAL log and compressed in archive - Fixes #8794. 90aeb56 is described below commit 90aeb56b3f8aae2e452243e8c0ce2c08e8ebb6fe Author: Kirill Tkalenko <tkalkir...@yandex.ru> AuthorDate: Thu Mar 18 14:41:24 2021 +0300 IGNITE-14170 New metrics for number of bytes written to WAL log and compressed in archive - Fixes #8794. Signed-off-by: Sergey Chugunov <sergey.chugu...@gmail.com> --- .../java/org/apache/ignite/DataStorageMetrics.java | 14 ++ .../cache/persistence/DataStorageMetricsImpl.java | 57 ++++++++- .../persistence/DataStorageMetricsSnapshot.java | 18 +++ .../persistence/wal/FileWriteAheadLogManager.java | 13 +- .../ignite/mxbean/DataStorageMetricsMXBean.java | 8 ++ .../IgniteDataStorageMetricsSelfTest.java | 142 +++++++++++++++++++++ 6 files changed, 245 insertions(+), 7 deletions(-) diff --git a/modules/core/src/main/java/org/apache/ignite/DataStorageMetrics.java b/modules/core/src/main/java/org/apache/ignite/DataStorageMetrics.java index b54d2b3..1a549c5 100644 --- a/modules/core/src/main/java/org/apache/ignite/DataStorageMetrics.java +++ b/modules/core/src/main/java/org/apache/ignite/DataStorageMetrics.java @@ -249,4 +249,18 @@ public interface DataStorageMetrics { * or negative value is not supported. */ public long getSparseStorageSize(); + + /** + * Getting the total number of logged bytes into the WAL. + * + * @return Number of bytes. + */ + long getWalWrittenBytes(); + + /** + * Getting the total size of the compressed segments in bytes. + * + * @return Number of bytes. + */ + long getWalCompressedBytes(); } diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/DataStorageMetricsImpl.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/DataStorageMetricsImpl.java index 1083a50..73c8dc98 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/DataStorageMetricsImpl.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/DataStorageMetricsImpl.java @@ -25,10 +25,12 @@ import org.apache.ignite.internal.processors.metric.MetricRegistry; import org.apache.ignite.internal.processors.metric.impl.AtomicLongMetric; import org.apache.ignite.internal.processors.metric.impl.HistogramMetricImpl; import org.apache.ignite.internal.processors.metric.impl.HitRateMetric; +import org.apache.ignite.internal.processors.metric.impl.LongAdderMetric; import org.apache.ignite.internal.util.typedef.F; import org.apache.ignite.internal.util.typedef.internal.U; import org.apache.ignite.lang.IgniteOutClosure; import org.apache.ignite.mxbean.DataStorageMetricsMXBean; +import org.jetbrains.annotations.Nullable; /** * @@ -106,8 +108,8 @@ public class DataStorageMetricsImpl implements DataStorageMetricsMXBean { /** */ private volatile boolean metricsEnabled; - /** */ - private volatile IgniteWriteAheadLogManager wal; + /** WAL manager. */ + @Nullable private volatile IgniteWriteAheadLogManager wal; /** */ private volatile IgniteOutClosure<Long> walSizeProvider; @@ -160,6 +162,12 @@ public class DataStorageMetricsImpl implements DataStorageMetricsMXBean { /** */ private final HistogramMetricImpl cpHistogram; + /** Total number of logged bytes into the WAL. */ + private final LongAdderMetric walWrittenBytes; + + /** Total size of the compressed segments in bytes. */ + private final LongAdderMetric walCompressedBytes; + /** * @param mmgr Metrics manager. * @param metricsEnabled Metrics enabled flag. @@ -308,6 +316,16 @@ public class DataStorageMetricsImpl implements DataStorageMetricsMXBean { cpHistogram = mreg.histogram("CheckpointHistogram", cpBounds, "Histogram of checkpoint duration in milliseconds."); + + walWrittenBytes = mreg.longAdderMetric( + "WalWrittenBytes", + "Total number of logged bytes into the WAL." + ); + + walCompressedBytes = mreg.longAdderMetric( + "WalCompressedBytes", + "Total size of the compressed segments in bytes." + ); } /** {@inheritDoc} */ @@ -331,7 +349,9 @@ public class DataStorageMetricsImpl implements DataStorageMetricsMXBean { if (!metricsEnabled) return 0; - return wal.walArchiveSegments(); + IgniteWriteAheadLogManager walMgr = this.wal; + + return walMgr == null ? 0 : walMgr.walArchiveSegments(); } /** {@inheritDoc} */ @@ -785,10 +805,14 @@ public class DataStorageMetricsImpl implements DataStorageMetricsMXBean { } /** + * Callback on logging a record to a WAL. * + * @param size Record size in bytes. */ - public void onWalRecordLogged() { + public void onWalRecordLogged(long size) { walLoggingRate.increment(); + + walWrittenBytes.add(size); } /** @@ -826,4 +850,29 @@ public class DataStorageMetricsImpl implements DataStorageMetricsMXBean { walFsyncTimeDuration.reset(rateTimeInterval, subInts); walFsyncTimeNum.reset(rateTimeInterval, subInts); } + + /** {@inheritDoc} */ + @Override public long getWalWrittenBytes() { + if (!metricsEnabled) + return 0; + + return walWrittenBytes.value(); + } + + /** {@inheritDoc} */ + @Override public long getWalCompressedBytes() { + if (!metricsEnabled) + return 0; + + return walCompressedBytes.value(); + } + + /** + * Callback on compression of a WAL segment. + * + * @param size Size of the compressed segment in bytes. + */ + public void onWalSegmentCompressed(long size) { + walCompressedBytes.add(size); + } } diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/DataStorageMetricsSnapshot.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/DataStorageMetricsSnapshot.java index a698508..7f65952 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/DataStorageMetricsSnapshot.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/DataStorageMetricsSnapshot.java @@ -120,6 +120,12 @@ public class DataStorageMetricsSnapshot implements DataStorageMetrics { /** */ private long sparseStorageSize; + /** Total number of logged bytes into the WAL. */ + private long walWrittenBytes; + + /** Total size of the compressed segments in bytes. */ + private long walCompressedBytes; + /** * @param metrics Metrics. */ @@ -153,6 +159,8 @@ public class DataStorageMetricsSnapshot implements DataStorageMetrics { totalAllocatedSize = metrics.getTotalAllocatedSize(); storageSize = metrics.getStorageSize(); sparseStorageSize = metrics.getSparseStorageSize(); + walWrittenBytes = metrics.getWalWrittenBytes(); + walCompressedBytes = metrics.getWalCompressedBytes(); } /** {@inheritDoc} */ @@ -301,6 +309,16 @@ public class DataStorageMetricsSnapshot implements DataStorageMetrics { } /** {@inheritDoc} */ + @Override public long getWalWrittenBytes() { + return walWrittenBytes; + } + + /** {@inheritDoc} */ + @Override public long getWalCompressedBytes() { + return walCompressedBytes; + } + + /** {@inheritDoc} */ @Override public String toString() { return S.toString(DataStorageMetricsSnapshot.class, this); } diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/wal/FileWriteAheadLogManager.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/wal/FileWriteAheadLogManager.java index ebd859d..607561d 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/wal/FileWriteAheadLogManager.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/wal/FileWriteAheadLogManager.java @@ -898,7 +898,7 @@ public class FileWriteAheadLogManager extends GridCacheSharedManagerAdapter impl } if (ptr != null) { - metrics.onWalRecordLogged(); + metrics.onWalRecordLogged(rec.size()); if (walAutoArchiveAfterInactivity > 0) lastRecordLoggedMs.set(U.currentTimeMillis()); @@ -2115,6 +2115,9 @@ public class FileWriteAheadLogManager extends GridCacheSharedManagerAdapter impl if (alreadyCompressed.length > 0) segmentAware.onSegmentCompressed(alreadyCompressed[alreadyCompressed.length - 1].idx()); + + for (FileDescriptor fd : alreadyCompressed) + metrics.onWalSegmentCompressed(fd.file().length()); } /** @@ -2238,8 +2241,12 @@ public class FileWriteAheadLogManager extends GridCacheSharedManagerAdapter impl f0.force(); } - segmentSize.put(segIdx, zip.length()); - segmentAware.addCurrentWalArchiveSize(zip.length()); + long zipLen = zip.length(); + + segmentSize.put(segIdx, zipLen); + segmentAware.addCurrentWalArchiveSize(zipLen); + + metrics.onWalSegmentCompressed(zipLen); segmentAware.onSegmentCompressed(segIdx); diff --git a/modules/core/src/main/java/org/apache/ignite/mxbean/DataStorageMetricsMXBean.java b/modules/core/src/main/java/org/apache/ignite/mxbean/DataStorageMetricsMXBean.java index 3b44828..c038a67 100644 --- a/modules/core/src/main/java/org/apache/ignite/mxbean/DataStorageMetricsMXBean.java +++ b/modules/core/src/main/java/org/apache/ignite/mxbean/DataStorageMetricsMXBean.java @@ -195,4 +195,12 @@ public interface DataStorageMetricsMXBean extends DataStorageMetrics { /** {@inheritDoc} */ @MXBeanDescription("Storage space allocated adjusted for possible sparsity, in bytes.") @Override long getSparseStorageSize(); + + /** {@inheritDoc} */ + @MXBeanDescription("Getting the total number of logged bytes into the WAL.") + @Override long getWalWrittenBytes(); + + /** {@inheritDoc} */ + @MXBeanDescription("Getting the total size of the compressed segments in bytes.") + @Override long getWalCompressedBytes(); } diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/persistence/IgniteDataStorageMetricsSelfTest.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/persistence/IgniteDataStorageMetricsSelfTest.java index a06cad1..0be8220 100644 --- a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/persistence/IgniteDataStorageMetricsSelfTest.java +++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/persistence/IgniteDataStorageMetricsSelfTest.java @@ -20,8 +20,10 @@ package org.apache.ignite.internal.processors.cache.persistence; import java.io.Serializable; import java.util.Arrays; import java.util.Objects; +import java.util.concurrent.ThreadLocalRandom; import java.util.concurrent.atomic.AtomicInteger; import java.util.concurrent.atomic.AtomicLong; +import java.util.function.UnaryOperator; import java.util.regex.Matcher; import java.util.regex.Pattern; import org.apache.ignite.DataRegionMetrics; @@ -38,21 +40,31 @@ import org.apache.ignite.configuration.DataStorageConfiguration; import org.apache.ignite.configuration.IgniteConfiguration; import org.apache.ignite.configuration.WALMode; import org.apache.ignite.internal.IgniteEx; +import org.apache.ignite.internal.pagemem.wal.record.DataRecord; +import org.apache.ignite.internal.processors.cache.WalStateManager.WALDisableContext; +import org.apache.ignite.internal.processors.cache.persistence.wal.FileDescriptor; +import org.apache.ignite.internal.processors.cache.persistence.wal.FileWriteAheadLogManager; import org.apache.ignite.internal.processors.metric.MetricRegistry; import org.apache.ignite.internal.processors.metric.impl.AtomicLongMetric; +import org.apache.ignite.internal.processors.metric.impl.LongAdderMetric; import org.apache.ignite.internal.util.tostring.GridToStringInclude; import org.apache.ignite.internal.util.typedef.PAX; import org.apache.ignite.internal.util.typedef.internal.S; +import org.apache.ignite.internal.util.typedef.internal.U; +import org.apache.ignite.mxbean.DataStorageMetricsMXBean; import org.apache.ignite.spi.metric.HistogramMetric; import org.apache.ignite.testframework.ListeningTestLogger; import org.apache.ignite.testframework.junits.common.GridCommonAbstractTest; import org.junit.Test; +import static java.util.Collections.emptyList; import static org.apache.ignite.cache.CacheAtomicityMode.ATOMIC; import static org.apache.ignite.cache.CacheMode.PARTITIONED; import static org.apache.ignite.cache.CacheWriteSynchronizationMode.FULL_SYNC; import static org.apache.ignite.cluster.ClusterState.ACTIVE; import static org.apache.ignite.internal.processors.cache.persistence.DataStorageMetricsImpl.DATASTORAGE_METRIC_PREFIX; +import static org.apache.ignite.internal.processors.cache.persistence.wal.serializer.RecordV1Serializer.HEADER_RECORD_SIZE; +import static org.apache.ignite.testframework.GridTestUtils.setFieldValue; import static org.apache.ignite.testframework.GridTestUtils.waitForCondition; /** @@ -305,6 +317,82 @@ public class IgniteDataStorageMetricsSelfTest extends GridCommonAbstractTest { } /** + * Checking that the metrics of the total logged bytes are working correctly. + * + * @throws Exception If failed. + */ + @Test + public void testWalWrittenBytes() throws Exception { + IgniteEx n = startGrid(0, (UnaryOperator<IgniteConfiguration>)cfg -> { + cfg.getDataStorageConfiguration().setWalSegmentSize((int)(2 * U.MB)); + + return cfg; + }); + + n.cluster().state(ACTIVE); + awaitPartitionMapExchange(); + + for (int i = 0; i < 10; i++) + n.cache("cache").put(ThreadLocalRandom.current().nextLong(), new byte[(int)(32 * U.KB)]); + + WALDisableContext walDisableCtx = n.context().cache().context().walState().walDisableContext(); + assertNotNull(walDisableCtx); + + setFieldValue(walDisableCtx, "disableWal", true); + + assertTrue(walDisableCtx.check()); + assertNull(walMgr(n).log(new DataRecord(emptyList()))); + + assertEquals(-1, walMgr(n).lastArchivedSegment()); + + long exp = walMgr(n).lastWritePointer().fileOffset() - HEADER_RECORD_SIZE; + + assertEquals(exp, dbMgr(n).persistentStoreMetrics().getWalWrittenBytes()); + assertEquals(exp, dsMetricsMXBean(n).getWalWrittenBytes()); + assertEquals(exp, ((LongAdderMetric)dsMetricRegistry(n).findMetric("WalWrittenBytes")).value()); + } + + /** + * Checking that the metrics of the total size compressed segment are working correctly. + * + * @throws Exception If failed. + */ + @Test + public void testWalCompressedBytes() throws Exception { + IgniteEx n0 = startGrid(0, (UnaryOperator<IgniteConfiguration>)cfg -> { + cfg.getDataStorageConfiguration().setWalCompactionEnabled(true).setWalSegmentSize((int)(2 * U.MB)); + + return cfg; + }); + + n0.cluster().state(ACTIVE); + awaitPartitionMapExchange(); + + while (walMgr(n0).lastArchivedSegment() < 3) + n0.cache("cache").put(ThreadLocalRandom.current().nextLong(), new byte[(int)(32 * U.KB)]); + + waitForCondition( + () -> walMgr(n0).lastArchivedSegment() == walMgr(n0).lastCompactedSegment(), + getTestTimeout() + ); + + assertCorrectWalCompressedBytesMetrics(n0); + + stopAllGrids(); + + IgniteEx n1 = startGrid(0, (UnaryOperator<IgniteConfiguration>)cfg -> { + cfg.getDataStorageConfiguration().setWalCompactionEnabled(true); + + return cfg; + }); + + n1.cluster().state(ACTIVE); + awaitPartitionMapExchange(); + + assertCorrectWalCompressedBytesMetrics(n1); + } + + /** * */ static class Person implements Serializable { @@ -351,4 +439,58 @@ public class IgniteDataStorageMetricsSelfTest extends GridCommonAbstractTest { return Objects.hash(fName, lName); } } + + /** + * Getting WAL manger. + * + * @param n Node. + * @return WAL manager. + */ + private FileWriteAheadLogManager walMgr(IgniteEx n) { + return (FileWriteAheadLogManager)n.context().cache().context().wal(); + } + + /** + * Getting db manager of node. + * + * @param n Node. + * @return Db manager. + */ + private GridCacheDatabaseSharedManager dbMgr(IgniteEx n) { + return (GridCacheDatabaseSharedManager)n.context().cache().context().database(); + } + + /** + * Getting DATASTORAGE_METRIC_PREFIX metric registry. + * + * @param n Node. + * @return Group of metrics. + */ + private MetricRegistry dsMetricRegistry(IgniteEx n) { + return n.context().metric().registry(DATASTORAGE_METRIC_PREFIX); + } + + /** + * Getting data storage MXBean. + * + * @param n Node. + * @return MXBean. + */ + private DataStorageMetricsMXBean dsMetricsMXBean(IgniteEx n) { + return getMxBean(n.name(), "Persistent Store", "DataStorageMetrics", DataStorageMetricsMXBean.class); + } + + /** + * Check that the metric of the total size compressed segment is working correctly. + * + * @param n Node. + */ + private void assertCorrectWalCompressedBytesMetrics(IgniteEx n) { + long exp = Arrays.stream(walMgr(n).walArchiveFiles()).filter(FileDescriptor::isCompressed) + .mapToLong(fd -> fd.file().length()).sum(); + + assertEquals(exp, dbMgr(n).persistentStoreMetrics().getWalCompressedBytes()); + assertEquals(exp, dsMetricsMXBean(n).getWalCompressedBytes()); + assertEquals(exp, ((LongAdderMetric)dsMetricRegistry(n).findMetric("WalCompressedBytes")).value()); + } }