This is an automated email from the ASF dual-hosted git repository.

rpuch pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/ignite-3.git


The following commit(s) were added to refs/heads/main by this push:
     new 09a7f32c172 IGNITE-26182 Add metrics for log storage sizes (#7647)
09a7f32c172 is described below

commit 09a7f32c1720d00bd776e916b8fd51a14379da51
Author: Roman Puchkovskiy <[email protected]>
AuthorDate: Tue Feb 24 16:51:58 2026 +0400

    IGNITE-26182 Add metrics for log storage sizes (#7647)
---
 .../ignite/internal/cli/CliIntegrationTest.java    |   1 +
 .../raftsnapshot/ItLogStorageMetricsTest.java      |  88 +++++++++++
 .../storage/impl/DefaultLogStorageManager.java     |  11 ++
 .../storage/impl/DefaultLogStorageManagerTest.java |  15 +-
 .../rest/metrics/ItMetricControllerTest.java       |   1 +
 modules/runner/build.gradle                        |   2 +
 .../org/apache/ignite/internal/app/IgniteImpl.java |  13 ++
 .../metrics/logstorage/LogStorageMetricSource.java |  96 ++++++++++++
 .../metrics/logstorage/LogStorageMetrics.java      | 157 +++++++++++++++++++
 .../logstorage/LogStorageMetricSourceTest.java     | 101 ++++++++++++
 .../metrics/logstorage/LogStorageMetricsTest.java  | 169 +++++++++++++++++++++
 11 files changed, 642 insertions(+), 12 deletions(-)

diff --git 
a/modules/cli/src/integrationTest/java/org/apache/ignite/internal/cli/CliIntegrationTest.java
 
b/modules/cli/src/integrationTest/java/org/apache/ignite/internal/cli/CliIntegrationTest.java
index 00eae74a96f..11601cfa5a2 100644
--- 
a/modules/cli/src/integrationTest/java/org/apache/ignite/internal/cli/CliIntegrationTest.java
+++ 
b/modules/cli/src/integrationTest/java/org/apache/ignite/internal/cli/CliIntegrationTest.java
@@ -87,6 +87,7 @@ public abstract class CliIntegrationTest extends 
ClusterPerClassIntegrationTest
             new MetricSource().name("index.builder").enabled(true),
             new MetricSource().name("raft.snapshots").enabled(true),
             new MetricSource().name("messaging").enabled(true),
+            new MetricSource().name("log.storage").enabled(true),
             new MetricSource().name(THREAD_POOLS_METRICS_SOURCE_NAME + 
".striped.messaging.inbound.default").enabled(true),
             new MetricSource().name(THREAD_POOLS_METRICS_SOURCE_NAME + 
".striped.messaging.inbound.deploymentunits").enabled(true),
             new MetricSource().name(THREAD_POOLS_METRICS_SOURCE_NAME + 
".striped.messaging.inbound.scalecube").enabled(true),
diff --git 
a/modules/raft/src/integrationTest/java/org/apache/ignite/internal/raftsnapshot/ItLogStorageMetricsTest.java
 
b/modules/raft/src/integrationTest/java/org/apache/ignite/internal/raftsnapshot/ItLogStorageMetricsTest.java
new file mode 100644
index 00000000000..8ff4a645a3e
--- /dev/null
+++ 
b/modules/raft/src/integrationTest/java/org/apache/ignite/internal/raftsnapshot/ItLogStorageMetricsTest.java
@@ -0,0 +1,88 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.raftsnapshot;
+
+import static org.apache.ignite.internal.TestWrappers.unwrapIgniteImpl;
+import static 
org.apache.ignite.internal.testframework.IgniteTestUtils.randomBytes;
+import static org.awaitility.Awaitility.await;
+import static org.hamcrest.MatcherAssert.assertThat;
+import static org.hamcrest.Matchers.greaterThanOrEqualTo;
+import static org.hamcrest.Matchers.is;
+import static org.hamcrest.Matchers.notNullValue;
+
+import java.util.Random;
+import org.apache.ignite.Ignite;
+import org.apache.ignite.internal.ClusterPerTestIntegrationTest;
+import org.apache.ignite.internal.metrics.LongGauge;
+import org.apache.ignite.internal.metrics.MetricSet;
+import org.apache.ignite.internal.raft.storage.impl.DefaultLogStorageManager;
+import org.junit.jupiter.api.BeforeEach;
+import org.junit.jupiter.api.Test;
+
+class ItLogStorageMetricsTest extends ClusterPerTestIntegrationTest {
+    private static final String TABLE_NAME = "TEST_TABLE";
+
+    private Ignite node;
+
+    @Override
+    protected int initialNodes() {
+        return 1;
+    }
+
+    @BeforeEach
+    void prepare() {
+        node = cluster.node(0);
+    }
+
+    @Test
+    void totalLogStorageMetricIsUpdated() throws Exception {
+        LongGauge totalLogStorageSize = totalLogStorageSizeGauge();
+
+        int valueLength = 1_000_000;
+
+        feedLogStorageWithBlob(valueLength);
+
+        await().alias("Total log storage size should reach the expected value")
+                .until(totalLogStorageSize::value, 
is(greaterThanOrEqualTo((long) valueLength)));
+    }
+
+    private LongGauge totalLogStorageSizeGauge() {
+        MetricSet logStorageMetrics = unwrapIgniteImpl(node)
+                .metricManager()
+                .metricSnapshot()
+                .metrics()
+                .get("log.storage");
+        assertThat(logStorageMetrics, is(notNullValue()));
+
+        LongGauge totalLogStorageSize = 
logStorageMetrics.get("TotalLogStorageSize");
+        assertThat(totalLogStorageSize, is(notNullValue()));
+
+        return totalLogStorageSize;
+    }
+
+    private void feedLogStorageWithBlob(int valueLength) throws Exception {
+        node.sql().executeScript("CREATE TABLE " + TABLE_NAME + "(ID INT 
PRIMARY KEY, VAL VARBINARY(" + valueLength + "))");
+
+        node.tables().table(TABLE_NAME)
+                .keyValueView(Integer.class, byte[].class)
+                .put(1, randomBytes(new Random(), valueLength));
+
+        DefaultLogStorageManager logStorageManager = 
(DefaultLogStorageManager) unwrapIgniteImpl(node).partitionsLogStorageManager();
+        logStorageManager.flushSstFiles();
+    }
+}
diff --git 
a/modules/raft/src/main/java/org/apache/ignite/internal/raft/storage/impl/DefaultLogStorageManager.java
 
b/modules/raft/src/main/java/org/apache/ignite/internal/raft/storage/impl/DefaultLogStorageManager.java
index 06afc56eddb..84123ef59f5 100644
--- 
a/modules/raft/src/main/java/org/apache/ignite/internal/raft/storage/impl/DefaultLogStorageManager.java
+++ 
b/modules/raft/src/main/java/org/apache/ignite/internal/raft/storage/impl/DefaultLogStorageManager.java
@@ -62,6 +62,7 @@ import org.rocksdb.CompactionStyle;
 import org.rocksdb.CompressionType;
 import org.rocksdb.DBOptions;
 import org.rocksdb.Env;
+import org.rocksdb.FlushOptions;
 import org.rocksdb.Priority;
 import org.rocksdb.ReadOptions;
 import org.rocksdb.RocksDB;
@@ -436,6 +437,16 @@ public class DefaultLogStorageManager implements 
LogStorageManager {
         return db;
     }
 
+    /**
+     * Flushes all SST files to disk.
+     */
+    @TestOnly
+    public void flushSstFiles() throws Exception {
+        try (var flushOptions = new FlushOptions().setWaitForFlush(true)) {
+            db.flush(flushOptions, List.of(metaHandle, confHandle, 
dataHandle));
+        }
+    }
+
     @TestOnly
     ColumnFamilyHandle metaColumnFamilyHandle() {
         return metaHandle;
diff --git 
a/modules/raft/src/test/java/org/apache/ignite/internal/raft/storage/impl/DefaultLogStorageManagerTest.java
 
b/modules/raft/src/test/java/org/apache/ignite/internal/raft/storage/impl/DefaultLogStorageManagerTest.java
index 18368c431d9..bee12132c89 100644
--- 
a/modules/raft/src/test/java/org/apache/ignite/internal/raft/storage/impl/DefaultLogStorageManagerTest.java
+++ 
b/modules/raft/src/test/java/org/apache/ignite/internal/raft/storage/impl/DefaultLogStorageManagerTest.java
@@ -57,8 +57,6 @@ import org.junit.jupiter.api.BeforeEach;
 import org.junit.jupiter.api.Test;
 import org.junit.jupiter.api.TestInfo;
 import org.junit.jupiter.api.extension.ExtendWith;
-import org.rocksdb.FlushOptions;
-import org.rocksdb.RocksDBException;
 
 @ExtendWith(WorkDirectoryExtension.class)
 class DefaultLogStorageManagerTest {
@@ -76,7 +74,7 @@ class DefaultLogStorageManagerTest {
         logStorageOptions.setConfigurationManager(new ConfigurationManager());
         
logStorageOptions.setLogEntryCodecFactory(DefaultLogEntryCodecFactory.getInstance());
 
-        boolean disableFsync = 
testInfo.getTestMethod().get().isAnnotationPresent(DisableFsync.class);
+        boolean disableFsync = 
testInfo.getTestMethod().orElseThrow().isAnnotationPresent(DisableFsync.class);
         logStorageManager = new DefaultLogStorageManager("test", "test", 
workDir, !disableFsync);
 
         startFactory();
@@ -308,7 +306,7 @@ class DefaultLogStorageManagerTest {
     }
 
     @Test
-    void totalBytesOnDiskAccountsForWal() throws Exception {
+    void totalBytesOnDiskAccountsForWal() {
         int entrySize = 1000;
 
         long originalSize = logStorageManager.totalBytesOnDisk();
@@ -331,17 +329,10 @@ class DefaultLogStorageManagerTest {
         logStorage.appendEntry(dataLogEntry(1, randomBytes(new Random(), 
entrySize)));
 
         // Make sure SST files are accounted for.
-        flushSstFiles();
+        logStorageManager.flushSstFiles();
         assertThat(logStorageManager.totalBytesOnDisk(), 
is(greaterThanOrEqualTo(originalSize + entrySize)));
     }
 
-    private void flushSstFiles() throws RocksDBException {
-        try (var flushOptions = new FlushOptions().setWaitForFlush(true)) {
-            //noinspection resource
-            logStorageManager.db().flush(flushOptions);
-        }
-    }
-
     @Retention(RetentionPolicy.RUNTIME)
     @Target(ElementType.METHOD)
     private @interface DisableFsync {
diff --git 
a/modules/rest/src/integrationTest/java/org/apache/ignite/internal/rest/metrics/ItMetricControllerTest.java
 
b/modules/rest/src/integrationTest/java/org/apache/ignite/internal/rest/metrics/ItMetricControllerTest.java
index 6f2ee5385ac..3cde2837d0f 100644
--- 
a/modules/rest/src/integrationTest/java/org/apache/ignite/internal/rest/metrics/ItMetricControllerTest.java
+++ 
b/modules/rest/src/integrationTest/java/org/apache/ignite/internal/rest/metrics/ItMetricControllerTest.java
@@ -75,6 +75,7 @@ class ItMetricControllerTest extends 
ClusterPerClassIntegrationTest {
             new MetricSource("index.builder", true),
             new MetricSource("raft.snapshots", true),
             new MetricSource("messaging", true),
+            new MetricSource("log.storage", true),
             new MetricSource(THREAD_POOLS_METRICS_SOURCE_NAME + 
".striped.messaging.inbound.default", true),
             new MetricSource(THREAD_POOLS_METRICS_SOURCE_NAME + 
".striped.messaging.inbound.deploymentunits", true),
             new MetricSource(THREAD_POOLS_METRICS_SOURCE_NAME + 
".striped.messaging.inbound.scalecube", true),
diff --git a/modules/runner/build.gradle b/modules/runner/build.gradle
index 0ee64387313..db94bccfa69 100644
--- a/modules/runner/build.gradle
+++ b/modules/runner/build.gradle
@@ -121,12 +121,14 @@ dependencies {
     testImplementation testFixtures(project(':ignite-vault'))
     testImplementation testFixtures(project(':ignite-metastorage'))
     testImplementation testFixtures(project(':ignite-jdbc'))
+    testImplementation testFixtures(project(':ignite-metrics'))
     testImplementation(libs.jsonpath.assert) {
         //IDEA test runner doesn't apply Gradle dependency resolve strategy, 
this is just not implemented
         //So, exclude asm-core transitive dependency to protect of jar-hell.
         exclude group: 'org.ow2.asm', module: 'asm'
     }
     testImplementation libs.auto.service.annotations
+    testImplementation libs.awaitility
 
     integrationTestAnnotationProcessor 
project(':ignite-configuration-annotation-processor')
     integrationTestAnnotationProcessor libs.jmh.annotation.processor
diff --git 
a/modules/runner/src/main/java/org/apache/ignite/internal/app/IgniteImpl.java 
b/modules/runner/src/main/java/org/apache/ignite/internal/app/IgniteImpl.java
index 0727c4ab1bd..c91a98f304c 100644
--- 
a/modules/runner/src/main/java/org/apache/ignite/internal/app/IgniteImpl.java
+++ 
b/modules/runner/src/main/java/org/apache/ignite/internal/app/IgniteImpl.java
@@ -171,6 +171,7 @@ import 
org.apache.ignite.internal.metastorage.server.raft.MetastorageGroupId;
 import org.apache.ignite.internal.metrics.MetricManager;
 import org.apache.ignite.internal.metrics.MetricManagerImpl;
 import 
org.apache.ignite.internal.metrics.configuration.MetricExtensionConfiguration;
+import org.apache.ignite.internal.metrics.logstorage.LogStorageMetrics;
 import org.apache.ignite.internal.metrics.messaging.MetricMessaging;
 import org.apache.ignite.internal.metrics.sources.ClockServiceMetricSource;
 import org.apache.ignite.internal.metrics.sources.JvmMetricSource;
@@ -451,6 +452,8 @@ public class IgniteImpl implements Ignite {
     /** Creator for volatile {@link LogStorageManager} instances. */
     private final VolatileLogStorageManagerCreator 
volatileLogStorageManagerCreator;
 
+    private final LogStorageMetrics logStorageMetrics;
+
     private final SystemPropertiesComponent systemPropertiesComponent;
 
     /** A hybrid logical clock. */
@@ -925,6 +928,15 @@ public class IgniteImpl implements Ignite {
 
         volatileLogStorageManagerCreator = new 
VolatileLogStorageManagerCreator(name, 
workDir.resolve("volatile-log-spillout"));
 
+        logStorageMetrics = new LogStorageMetrics(
+                name,
+                metricManager,
+                cmgLogStorageManager,
+                msLogStorageManager,
+                partitionsLogStorageManager,
+                volatileLogStorageManagerCreator
+        );
+
         schemaSafeTimeTracker = new 
SchemaSafeTimeTrackerImpl(metaStorageMgr.clusterTime());
         
metaStorageMgr.registerNotificationEnqueuedListener(schemaSafeTimeTracker);
 
@@ -1630,6 +1642,7 @@ public class IgniteImpl implements Ignite {
                                 distributionZoneManager,
                                 computeComponent,
                                 volatileLogStorageManagerCreator,
+                                logStorageMetrics,
                                 replicaMgr,
                                 indexNodeFinishedRwTransactionsChecker,
                                 txManager,
diff --git 
a/modules/runner/src/main/java/org/apache/ignite/internal/metrics/logstorage/LogStorageMetricSource.java
 
b/modules/runner/src/main/java/org/apache/ignite/internal/metrics/logstorage/LogStorageMetricSource.java
new file mode 100644
index 00000000000..35e66bb3b23
--- /dev/null
+++ 
b/modules/runner/src/main/java/org/apache/ignite/internal/metrics/logstorage/LogStorageMetricSource.java
@@ -0,0 +1,96 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.metrics.logstorage;
+
+import java.util.List;
+import org.apache.ignite.internal.metrics.AbstractMetricSource;
+import org.apache.ignite.internal.metrics.LongGauge;
+import org.apache.ignite.internal.metrics.LongMetric;
+import org.apache.ignite.internal.metrics.Metric;
+import org.apache.ignite.internal.metrics.MetricSource;
+import 
org.apache.ignite.internal.metrics.logstorage.LogStorageMetricSource.Holder;
+
+/**
+ * {@link MetricSource} for log storage metrics.
+ */
+class LogStorageMetricSource extends AbstractMetricSource<Holder> {
+    static final String NAME = "log.storage";
+
+    private volatile long cmgLogStorageSizeBytes;
+    private volatile long metastorageLogStorageSizeBytes;
+    private volatile long partitionsLogStorageSizeBytes;
+
+    LogStorageMetricSource() {
+        super(NAME, "Log storage metrics.");
+    }
+
+    void cmgLogStorageSize(long newSize) {
+        this.cmgLogStorageSizeBytes = newSize;
+    }
+
+    void metastorageLogStorageSize(long newSize) {
+        this.metastorageLogStorageSizeBytes = newSize;
+    }
+
+    void partitionsLogStorageSize(long newSize) {
+        this.partitionsLogStorageSizeBytes = newSize;
+    }
+
+    @Override
+    protected Holder createHolder() {
+        return new Holder();
+    }
+
+    protected class Holder implements AbstractMetricSource.Holder<Holder> {
+        private final LongMetric cmgLogStorageSize = new LongGauge(
+                "CmgLogStorageSize",
+                "Number of bytes occupied on disk by the CMG log.",
+                () -> cmgLogStorageSizeBytes
+        );
+
+        private final LongMetric metastorageLogStorageSize = new LongGauge(
+                "MetastorageLogStorageSize",
+                "Number of bytes occupied on disk by the Metastorage group 
log.",
+                () -> metastorageLogStorageSizeBytes
+        );
+
+        private final LongMetric partitionsLogStorageSize = new LongGauge(
+                "PartitionsLogStorageSize",
+                "Number of bytes occupied on disk by the partitions groups 
logs.",
+                () -> partitionsLogStorageSizeBytes
+        );
+
+        private final LongMetric totalLogStorageSize = new LongGauge(
+                "TotalLogStorageSize",
+                "Number of bytes occupied on disk by logs of all replication 
groups.",
+                () -> cmgLogStorageSizeBytes + metastorageLogStorageSizeBytes 
+ partitionsLogStorageSizeBytes
+        );
+
+        private final List<Metric> metrics = List.of(
+                cmgLogStorageSize,
+                metastorageLogStorageSize,
+                partitionsLogStorageSize,
+                totalLogStorageSize
+        );
+
+        @Override
+        public Iterable<Metric> metrics() {
+            return metrics;
+        }
+    }
+}
diff --git 
a/modules/runner/src/main/java/org/apache/ignite/internal/metrics/logstorage/LogStorageMetrics.java
 
b/modules/runner/src/main/java/org/apache/ignite/internal/metrics/logstorage/LogStorageMetrics.java
new file mode 100644
index 00000000000..f5f073b26cf
--- /dev/null
+++ 
b/modules/runner/src/main/java/org/apache/ignite/internal/metrics/logstorage/LogStorageMetrics.java
@@ -0,0 +1,157 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.metrics.logstorage;
+
+import static java.util.concurrent.TimeUnit.MILLISECONDS;
+import static java.util.concurrent.TimeUnit.SECONDS;
+import static 
org.apache.ignite.internal.util.CompletableFutures.nullCompletedFuture;
+import static org.apache.ignite.internal.util.ExceptionUtils.hasCause;
+
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.Executors;
+import java.util.concurrent.ScheduledExecutorService;
+import java.util.concurrent.ScheduledFuture;
+import org.apache.ignite.internal.lang.NodeStoppingException;
+import org.apache.ignite.internal.logger.IgniteLogger;
+import org.apache.ignite.internal.logger.Loggers;
+import org.apache.ignite.internal.manager.ComponentContext;
+import org.apache.ignite.internal.manager.IgniteComponent;
+import org.apache.ignite.internal.metrics.MetricManager;
+import org.apache.ignite.internal.raft.storage.LogStorageManager;
+import 
org.apache.ignite.internal.raft.storage.impl.VolatileLogStorageManagerCreator;
+import org.apache.ignite.internal.thread.IgniteThreadFactory;
+
+/**
+ * Component that collects metrics about log storages.
+ */
+public class LogStorageMetrics implements IgniteComponent {
+    private static final IgniteLogger LOG = 
Loggers.forClass(LogStorageMetrics.class);
+
+    private static final long DEFAULT_UPDATE_PERIOD_MS = 1000;
+
+    private final String nodeName;
+
+    private final MetricManager metricManager;
+
+    private final LogStorageManager cmgLogStorageManager;
+    private final LogStorageManager metastorageLogStorageManager;
+    private final LogStorageManager partitionsLogStorageManager;
+    private final VolatileLogStorageManagerCreator 
volatileLogStorageManagerCreator;
+
+    private final long updatePeriodMs;
+
+    private final LogStorageMetricSource metricSource = new 
LogStorageMetricSource();
+
+    private volatile ScheduledExecutorService executorService;
+    private volatile ScheduledFuture<?> taskFuture;
+
+    /** Constructor. */
+    public LogStorageMetrics(
+            String nodeName,
+            MetricManager metricManager,
+            LogStorageManager cmgLogStorageManager,
+            LogStorageManager metastorageLogStorageManager,
+            LogStorageManager partitionsLogStorageManager,
+            VolatileLogStorageManagerCreator volatileLogStorageManagerCreator
+    ) {
+        this(
+                nodeName,
+                metricManager,
+                cmgLogStorageManager,
+                metastorageLogStorageManager,
+                partitionsLogStorageManager,
+                volatileLogStorageManagerCreator,
+                DEFAULT_UPDATE_PERIOD_MS
+        );
+    }
+
+    /** Constructor. */
+    LogStorageMetrics(
+            String nodeName,
+            MetricManager metricManager,
+            LogStorageManager cmgLogStorageManager,
+            LogStorageManager metastorageLogStorageManager,
+            LogStorageManager partitionsLogStorageManager,
+            VolatileLogStorageManagerCreator volatileLogStorageManagerCreator,
+            long updatePeriodMs
+    ) {
+        this.nodeName = nodeName;
+        this.metricManager = metricManager;
+        this.cmgLogStorageManager = cmgLogStorageManager;
+        this.metastorageLogStorageManager = metastorageLogStorageManager;
+        this.partitionsLogStorageManager = partitionsLogStorageManager;
+        this.volatileLogStorageManagerCreator = 
volatileLogStorageManagerCreator;
+        this.updatePeriodMs = updatePeriodMs;
+    }
+
+    @Override
+    public CompletableFuture<Void> startAsync(ComponentContext 
componentContext) {
+        metricManager.registerSource(metricSource);
+        metricManager.enable(metricSource);
+
+        executorService = Executors.newSingleThreadScheduledExecutor(
+                IgniteThreadFactory.create(nodeName, 
"log-storage-metrics-collector", LOG)
+        );
+        taskFuture = executorService.scheduleAtFixedRate(this::updateMetrics, 
0, updatePeriodMs, MILLISECONDS);
+
+        return nullCompletedFuture();
+    }
+
+    private void updateMetrics() {
+        try {
+            
metricSource.cmgLogStorageSize(cmgLogStorageManager.totalBytesOnDisk());
+            
metricSource.metastorageLogStorageSize(metastorageLogStorageManager.totalBytesOnDisk());
+            metricSource.partitionsLogStorageSize(
+                    partitionsLogStorageManager.totalBytesOnDisk() + 
volatileLogStorageManagerCreator.totalBytesOnDisk()
+            );
+        } catch (Exception | AssertionError e) {
+            if (!hasCause(e, NodeStoppingException.class)) {
+                LOG.warn("Failed to update log storage metrics", e);
+            }
+        } catch (Error e) {
+            LOG.error("Failed to update log storage metrics, no more updates 
will happen", e);
+
+            throw e;
+        }
+    }
+
+    @Override
+    public CompletableFuture<Void> stopAsync(ComponentContext 
componentContext) {
+        ScheduledFuture<?> futureToCancel = taskFuture;
+        if (futureToCancel != null) {
+            futureToCancel.cancel(true);
+        }
+
+        ScheduledExecutorService executorToShutdown = executorService;
+        if (executorToShutdown != null) {
+            executorToShutdown.shutdownNow();
+
+            // If we fail to finish waiting, our task might end up hitting a 
stopped RocksDB instance that will crash the node.
+            try {
+                executorToShutdown.awaitTermination(Long.MAX_VALUE, SECONDS);
+            } catch (InterruptedException e) {
+                // Ok, we are interrupted, cannot wait any longer.
+                Thread.currentThread().interrupt();
+            }
+        }
+
+        metricManager.unregisterSource(metricSource);
+
+        return nullCompletedFuture();
+    }
+}
diff --git 
a/modules/runner/src/test/java/org/apache/ignite/internal/metrics/logstorage/LogStorageMetricSourceTest.java
 
b/modules/runner/src/test/java/org/apache/ignite/internal/metrics/logstorage/LogStorageMetricSourceTest.java
new file mode 100644
index 00000000000..e2b70a553c3
--- /dev/null
+++ 
b/modules/runner/src/test/java/org/apache/ignite/internal/metrics/logstorage/LogStorageMetricSourceTest.java
@@ -0,0 +1,101 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.metrics.logstorage;
+
+import static java.util.stream.Collectors.toUnmodifiableSet;
+import static org.hamcrest.MatcherAssert.assertThat;
+import static org.hamcrest.Matchers.is;
+import static org.hamcrest.Matchers.notNullValue;
+
+import java.util.Set;
+import java.util.stream.StreamSupport;
+import org.apache.ignite.internal.metrics.LongGauge;
+import org.apache.ignite.internal.metrics.Metric;
+import org.apache.ignite.internal.metrics.MetricSet;
+import org.junit.jupiter.api.BeforeEach;
+import org.junit.jupiter.api.Test;
+
+class LogStorageMetricSourceTest {
+    private final LogStorageMetricSource metricSource = new 
LogStorageMetricSource();
+
+    private MetricSet metricSet;
+
+    @BeforeEach
+    void setUp() {
+        MetricSet ms = metricSource.enable();
+        assertThat(ms, is(notNullValue()));
+        metricSet = ms;
+    }
+
+    @Test
+    void metricSetIsAsExpected() {
+        Set<String> metricNames = 
StreamSupport.stream(metricSet.spliterator(), false)
+                .map(Metric::name)
+                .collect(toUnmodifiableSet());
+
+        assertThat(
+                metricNames,
+                is(Set.of("CmgLogStorageSize", "MetastorageLogStorageSize", 
"PartitionsLogStorageSize", "TotalLogStorageSize"))
+        );
+    }
+
+    @Test
+    void metricsAreInitializedToZero() {
+        assertThatLongGaugeHasValue("CmgLogStorageSize", 0);
+        assertThatLongGaugeHasValue("MetastorageLogStorageSize", 0);
+        assertThatLongGaugeHasValue("PartitionsLogStorageSize", 0);
+        assertThatLongGaugeHasValue("TotalLogStorageSize", 0);
+    }
+
+    private void assertThatLongGaugeHasValue(String metricName, long 
expectedValue) {
+        LongGauge gauge = metricSet.get(metricName);
+
+        assertThat(gauge, is(notNullValue()));
+        assertThat(gauge.value(), is(expectedValue));
+    }
+
+    @Test
+    void cmgLogStorageSizeIsUpdated() {
+        metricSource.cmgLogStorageSize(100);
+
+        assertThatLongGaugeHasValue("CmgLogStorageSize", 100);
+    }
+
+    @Test
+    void metastorageLogStorageSizeIsUpdated() {
+        metricSource.metastorageLogStorageSize(200);
+
+        assertThatLongGaugeHasValue("MetastorageLogStorageSize", 200);
+    }
+
+    @Test
+    void partitionsLogStorageSizeIsUpdated() {
+        metricSource.partitionsLogStorageSize(300);
+
+        assertThatLongGaugeHasValue("PartitionsLogStorageSize", 300);
+    }
+
+    @Test
+    void totalLogStorageSizeIsUpdated() {
+        metricSource.cmgLogStorageSize(1);
+        metricSource.metastorageLogStorageSize(10);
+        metricSource.partitionsLogStorageSize(100);
+
+        assertThatLongGaugeHasValue("TotalLogStorageSize", 111);
+    }
+}
diff --git 
a/modules/runner/src/test/java/org/apache/ignite/internal/metrics/logstorage/LogStorageMetricsTest.java
 
b/modules/runner/src/test/java/org/apache/ignite/internal/metrics/logstorage/LogStorageMetricsTest.java
new file mode 100644
index 00000000000..67bc7cd5896
--- /dev/null
+++ 
b/modules/runner/src/test/java/org/apache/ignite/internal/metrics/logstorage/LogStorageMetricsTest.java
@@ -0,0 +1,169 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.metrics.logstorage;
+
+import static 
org.apache.ignite.internal.testframework.IgniteTestUtils.randomBytes;
+import static 
org.apache.ignite.internal.testframework.matchers.CompletableFutureMatcher.willCompleteSuccessfully;
+import static org.apache.ignite.internal.util.IgniteUtils.startAsync;
+import static org.apache.ignite.internal.util.IgniteUtils.stopAsync;
+import static org.awaitility.Awaitility.await;
+import static org.hamcrest.MatcherAssert.assertThat;
+import static org.hamcrest.Matchers.greaterThanOrEqualTo;
+import static org.hamcrest.Matchers.is;
+import static org.hamcrest.Matchers.isA;
+import static org.hamcrest.Matchers.notNullValue;
+
+import java.nio.ByteBuffer;
+import java.nio.file.Path;
+import java.util.Random;
+import java.util.concurrent.CompletableFuture;
+import org.apache.ignite.internal.manager.ComponentContext;
+import org.apache.ignite.internal.metrics.LongGauge;
+import org.apache.ignite.internal.metrics.Metric;
+import org.apache.ignite.internal.metrics.TestMetricManager;
+import org.apache.ignite.internal.raft.storage.LogStorageManager;
+import 
org.apache.ignite.internal.raft.storage.impl.VolatileLogStorageManagerCreator;
+import org.apache.ignite.internal.raft.util.SharedLogStorageManagerUtils;
+import org.apache.ignite.internal.replicator.ZonePartitionId;
+import org.apache.ignite.internal.testframework.WorkDirectory;
+import org.apache.ignite.internal.testframework.WorkDirectoryExtension;
+import org.apache.ignite.raft.jraft.conf.ConfigurationManager;
+import org.apache.ignite.raft.jraft.entity.EnumOutter.EntryType;
+import org.apache.ignite.raft.jraft.entity.LogEntry;
+import org.apache.ignite.raft.jraft.entity.LogId;
+import org.apache.ignite.raft.jraft.entity.codec.DefaultLogEntryCodecFactory;
+import org.apache.ignite.raft.jraft.option.LogStorageOptions;
+import org.apache.ignite.raft.jraft.option.RaftOptions;
+import org.apache.ignite.raft.jraft.storage.LogStorage;
+import org.hamcrest.Matcher;
+import org.junit.jupiter.api.AfterEach;
+import org.junit.jupiter.api.BeforeEach;
+import org.junit.jupiter.api.Test;
+import org.junit.jupiter.api.extension.ExtendWith;
+
+@ExtendWith(WorkDirectoryExtension.class)
+class LogStorageMetricsTest {
+    @WorkDirectory
+    private Path workDir;
+
+    private LogStorageManager cmgLogStorageManager;
+    private LogStorageManager metastorageLogStorageManager;
+    private LogStorageManager partitionsLogStorageManager;
+
+    private VolatileLogStorageManagerCreator volatileLogStorageManagerCreator;
+
+    private final TestMetricManager metricManager = new TestMetricManager();
+
+    private LogStorageMetrics logStorageMetrics;
+
+    private final RaftOptions raftOptions = new RaftOptions();
+
+    private final LogStorageOptions logStorageOptions = new 
LogStorageOptions();
+
+    @BeforeEach
+    void setUp() {
+        logStorageOptions.setConfigurationManager(new ConfigurationManager());
+        
logStorageOptions.setLogEntryCodecFactory(DefaultLogEntryCodecFactory.getInstance());
+
+        String nodeName = "test";
+
+        cmgLogStorageManager = SharedLogStorageManagerUtils.create(nodeName, 
workDir.resolve("cmg"));
+        metastorageLogStorageManager = 
SharedLogStorageManagerUtils.create(nodeName, workDir.resolve("metastorage"));
+        partitionsLogStorageManager = 
SharedLogStorageManagerUtils.create(nodeName, workDir.resolve("partitions"));
+
+        volatileLogStorageManagerCreator = new 
VolatileLogStorageManagerCreator(nodeName, workDir.resolve("spillout"));
+
+        logStorageMetrics = new LogStorageMetrics(
+                nodeName,
+                metricManager,
+                cmgLogStorageManager,
+                metastorageLogStorageManager,
+                partitionsLogStorageManager,
+                volatileLogStorageManagerCreator,
+                10
+        );
+
+        CompletableFuture<Void> startFuture = startAsync(
+                new ComponentContext(),
+                cmgLogStorageManager,
+                metastorageLogStorageManager,
+                partitionsLogStorageManager,
+                volatileLogStorageManagerCreator,
+                logStorageMetrics
+        );
+        assertThat(startFuture, willCompleteSuccessfully());
+    }
+
+    @AfterEach
+    void cleanup() {
+        CompletableFuture<Void> stopFuture = stopAsync(
+                new ComponentContext(),
+                logStorageMetrics,
+                volatileLogStorageManagerCreator,
+                partitionsLogStorageManager,
+                metastorageLogStorageManager,
+                cmgLogStorageManager
+        );
+        assertThat(stopFuture, willCompleteSuccessfully());
+    }
+
+    @Test
+    void cmgLogStorageSizeIsAccountedFor() {
+        testLogStorageSizeIsAccountedFor(cmgLogStorageManager, "cmg", 
"CmgLogStorageSize");
+    }
+
+    private void testLogStorageSizeIsAccountedFor(LogStorageManager 
logStorageManager, String groupUri, String metricName) {
+        LogStorage logStorage = logStorageManager.createLogStorage(groupUri, 
raftOptions);
+        logStorage.init(logStorageOptions);
+
+        logStorage.appendEntry(dataLogEntry(1, randomBytes(new Random(), 
1000)));
+
+        waitForLongGaugeValue(metricName, is(greaterThanOrEqualTo(1000L)));
+        waitForLongGaugeValue("TotalLogStorageSize", 
is(greaterThanOrEqualTo(1000L)));
+    }
+
+    private static LogEntry dataLogEntry(int index, byte[] content) {
+        LogEntry logEntry = new LogEntry();
+
+        logEntry.setId(new LogId(index, 1));
+        logEntry.setType(EntryType.ENTRY_TYPE_DATA);
+        logEntry.setData(ByteBuffer.wrap(content));
+
+        return logEntry;
+    }
+
+    private void waitForLongGaugeValue(String metricName, Matcher<Long> 
valueMatcher) {
+        Metric metric = metricManager.metric(LogStorageMetricSource.NAME, 
metricName);
+        assertThat(metric, isA(LongGauge.class));
+        LongGauge gauge = (LongGauge) metric;
+
+        assertThat(gauge, is(notNullValue()));
+
+        await().until(gauge::value, valueMatcher);
+    }
+
+    @Test
+    void metastorageLogStorageSizeIsAccountedFor() {
+        testLogStorageSizeIsAccountedFor(metastorageLogStorageManager, 
"metastorage", "MetastorageLogStorageSize");
+    }
+
+    @Test
+    void partitionsLogStorageSizeIsAccountedFor() {
+        testLogStorageSizeIsAccountedFor(partitionsLogStorageManager, new 
ZonePartitionId(1, 0).toString(), "PartitionsLogStorageSize");
+    }
+}


Reply via email to