This is an automated email from the ASF dual-hosted git repository.
rpuch pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/ignite-3.git
The following commit(s) were added to refs/heads/main by this push:
new ffc6c83f12a IGNITE-27359 Add aipersist runConsistently metrics (#7422)
ffc6c83f12a is described below
commit ffc6c83f12a7c8d02e21d281a07f36a50af577ee
Author: Viacheslav Blinov <[email protected]>
AuthorDate: Fri Feb 27 14:04:16 2026 +0300
IGNITE-27359 Add aipersist runConsistently metrics (#7422)
---
.../ignite/internal/metrics/MetricMatchers.java | 93 +++++++++++++++
.../checkpoint/CheckpointTimeoutLockTest.java | 34 +++---
.../replacement/AbstractPageReplacementTest.java | 11 +-
.../persistence/store/FilePageStoreIoTest.java | 41 +++----
.../PersistentPageMemoryStorageEngine.java | 12 +-
.../PersistentPageMemoryStorageMetricSource.java | 85 --------------
.../PersistentPageMemoryStorageMetrics.java | 3 +-
.../PersistentPageMemoryTableStorage.java | 12 +-
.../mv/PersistentPageMemoryMvPartitionStorage.java | 56 +++++++--
.../pagememory/mv/RunConsistentlyMetrics.java | 125 +++++++++++++++++++++
.../PersistentPageMemoryStorageMetricsTest.java | 3 +-
.../pagememory/mv/FailedCheckpointTest.java | 9 +-
...PersistentPageMemoryMvPartitionStorageTest.java | 74 +++++++++++-
13 files changed, 395 insertions(+), 163 deletions(-)
diff --git
a/modules/metrics/src/testFixtures/java/org/apache/ignite/internal/metrics/MetricMatchers.java
b/modules/metrics/src/testFixtures/java/org/apache/ignite/internal/metrics/MetricMatchers.java
new file mode 100644
index 00000000000..f8a20843329
--- /dev/null
+++
b/modules/metrics/src/testFixtures/java/org/apache/ignite/internal/metrics/MetricMatchers.java
@@ -0,0 +1,93 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.metrics;
+
+import static org.hamcrest.Matchers.allOf;
+import static org.hamcrest.Matchers.is;
+import static org.hamcrest.Matchers.notNullValue;
+
+import java.util.Arrays;
+import org.hamcrest.FeatureMatcher;
+import org.hamcrest.Matcher;
+
+/**
+ * Hamcrest matchers for testing metrics.
+ */
+public final class MetricMatchers {
+ private MetricMatchers() {
+ // No-op.
+ }
+
+ /**
+ * Creates a matcher that matches a {@link DistributionMetric} with the
expected total number of measurements
+ * across all histogram buckets.
+ *
+ * @param expectedMeasuresCount Expected total number of measurements
across all buckets.
+ * @return Matcher for distribution metric measures count.
+ */
+ public static Matcher<DistributionMetric> hasMeasurementsCount(long
expectedMeasuresCount) {
+ return new FeatureMatcher<>(
+ is(expectedMeasuresCount),
+ "a DistributionMetric with measures count",
+ "measures count") {
+ @Override
+ protected Long featureValueOf(DistributionMetric metric) {
+ return Arrays.stream(metric.value()).sum();
+ }
+ };
+ }
+
+ /**
+ * Creates a matcher that matches a {@link LongMetric} whose value
satisfies the given matcher.
+ *
+ * @param valueMatcher Matcher for the metric value.
+ * @return Matcher for long metric value.
+ */
+ public static Matcher<LongMetric> hasValue(Matcher<Long> valueMatcher) {
+ return new FeatureMatcher<>(
+ valueMatcher,
+ "a LongMetric with value",
+ "value") {
+ @Override
+ protected Long featureValueOf(LongMetric metric) {
+ return metric.value();
+ }
+ };
+ }
+
+ /**
+ * Creates a matcher that matches a {@link MetricSet} containing a metric
with the given name
+ * that satisfies the given matcher.
+ *
+ * @param name Name of the metric to look up in the metric set.
+ * @param metricMatcher Matcher for the metric.
+ * @param <M> Type of the metric.
+ * @return Matcher for metric set containing the specified metric.
+ */
+ public static <M extends Metric> Matcher<MetricSet> hasMetric(String name,
Matcher<M> metricMatcher) {
+ return new FeatureMatcher<>(
+ allOf(notNullValue(), metricMatcher),
+ "a MetricSet with metric named \"" + name + "\"",
+ "metric named \"" + name + "\"") {
+ @Override
+ protected M featureValueOf(MetricSet metricSet) {
+ return metricSet.get(name);
+ }
+ };
+ }
+}
diff --git
a/modules/page-memory/src/test/java/org/apache/ignite/internal/pagememory/persistence/checkpoint/CheckpointTimeoutLockTest.java
b/modules/page-memory/src/test/java/org/apache/ignite/internal/pagememory/persistence/checkpoint/CheckpointTimeoutLockTest.java
index 9aafb84b044..e151550b2c2 100644
---
a/modules/page-memory/src/test/java/org/apache/ignite/internal/pagememory/persistence/checkpoint/CheckpointTimeoutLockTest.java
+++
b/modules/page-memory/src/test/java/org/apache/ignite/internal/pagememory/persistence/checkpoint/CheckpointTimeoutLockTest.java
@@ -21,6 +21,7 @@ import static java.lang.Thread.currentThread;
import static java.util.concurrent.CompletableFuture.failedFuture;
import static java.util.concurrent.TimeUnit.MILLISECONDS;
import static java.util.concurrent.TimeUnit.SECONDS;
+import static
org.apache.ignite.internal.metrics.MetricMatchers.hasMeasurementsCount;
import static
org.apache.ignite.internal.pagememory.persistence.CheckpointUrgency.MUST_TRIGGER;
import static
org.apache.ignite.internal.pagememory.persistence.CheckpointUrgency.NOT_REQUIRED;
import static
org.apache.ignite.internal.pagememory.persistence.checkpoint.CheckpointState.LOCK_RELEASED;
@@ -42,7 +43,6 @@ import static org.mockito.Mockito.doAnswer;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.when;
-import java.util.Arrays;
import java.util.concurrent.CancellationException;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.CountDownLatch;
@@ -53,7 +53,6 @@ import java.util.concurrent.atomic.AtomicReference;
import org.apache.ignite.internal.failure.FailureManager;
import org.apache.ignite.internal.lang.IgniteInternalException;
import org.apache.ignite.internal.lang.NodeStoppingException;
-import org.apache.ignite.internal.metrics.DistributionMetric;
import org.apache.ignite.internal.pagememory.metrics.CollectionMetricSource;
import org.apache.ignite.internal.pagememory.persistence.CheckpointUrgency;
import org.apache.ignite.internal.testframework.BaseIgniteAbstractTest;
@@ -438,17 +437,26 @@ public class CheckpointTimeoutLockTest extends
BaseIgniteAbstractTest {
try {
// Verify metrics start at zero
-
assertDistributionMetricRecordsCount(metrics.readLockAcquisitionTime(), 0L);
+ assertThat(
+ metrics.readLockAcquisitionTime(),
+ hasMeasurementsCount(0L)
+ );
// Acquire and immediately release the lock
timeoutLock.checkpointReadLock();
timeoutLock.checkpointReadUnlock();
// Verify acquisition was recorded
-
assertDistributionMetricRecordsCount(metrics.readLockAcquisitionTime(), 1L);
+ assertThat(
+ metrics.readLockAcquisitionTime(),
+ hasMeasurementsCount(1L)
+ );
// Verify hold time distribution was recorded
- assertDistributionMetricRecordsCount(metrics.readLockHoldTime(),
1L);
+ assertThat(
+ metrics.readLockHoldTime(),
+ hasMeasurementsCount(1L)
+ );
readWriteLock.writeLock();
runAsync(() -> {
@@ -462,20 +470,4 @@ public class CheckpointTimeoutLockTest extends
BaseIgniteAbstractTest {
timeoutLock.stop();
}
}
-
- /**
- * Verifies that the specified distribution metric has recorded the
expected total number of measurements.
- *
- * <p>
- * Rather than checking individual histogram buckets, this method
aggregates all recorded measurements across every bucket
- * and confirms that the expected interaction was captured in at least one
of them.
- */
- private static void
assertDistributionMetricRecordsCount(DistributionMetric metric, long
expectedMeasuresCount) {
- long totalMeasuresCount = Arrays.stream(metric.value()).sum();
- assertThat(
- "Unexpected total measures count in distribution metric " +
metric.name(),
- totalMeasuresCount,
- is(expectedMeasuresCount)
- );
- }
}
diff --git
a/modules/page-memory/src/test/java/org/apache/ignite/internal/pagememory/persistence/replacement/AbstractPageReplacementTest.java
b/modules/page-memory/src/test/java/org/apache/ignite/internal/pagememory/persistence/replacement/AbstractPageReplacementTest.java
index 723871f0586..76525174066 100644
---
a/modules/page-memory/src/test/java/org/apache/ignite/internal/pagememory/persistence/replacement/AbstractPageReplacementTest.java
+++
b/modules/page-memory/src/test/java/org/apache/ignite/internal/pagememory/persistence/replacement/AbstractPageReplacementTest.java
@@ -17,6 +17,8 @@
package org.apache.ignite.internal.pagememory.persistence.replacement;
+import static org.apache.ignite.internal.metrics.MetricMatchers.hasMetric;
+import static org.apache.ignite.internal.metrics.MetricMatchers.hasValue;
import static org.apache.ignite.internal.pagememory.PageIdAllocator.FLAG_DATA;
import static
org.apache.ignite.internal.pagememory.persistence.PersistentPageMemoryMetricSource.DIRTY_PAGES;
import static
org.apache.ignite.internal.pagememory.persistence.PersistentPageMemoryMetricSource.LOADED_PAGES;
@@ -36,7 +38,6 @@ import static
org.apache.ignite.internal.util.GridUnsafe.freeBuffer;
import static org.hamcrest.MatcherAssert.assertThat;
import static org.hamcrest.Matchers.greaterThan;
import static org.hamcrest.Matchers.is;
-import static org.hamcrest.Matchers.notNullValue;
import static org.junit.jupiter.api.Assertions.assertFalse;
import static org.junit.jupiter.api.Assertions.assertTrue;
import static org.mockito.ArgumentMatchers.any;
@@ -60,7 +61,6 @@ import
org.apache.ignite.internal.configuration.testframework.ConfigurationExten
import org.apache.ignite.internal.failure.FailureManager;
import org.apache.ignite.internal.fileio.RandomAccessFileIoFactory;
import org.apache.ignite.internal.lang.RunnableX;
-import org.apache.ignite.internal.metrics.LongMetric;
import org.apache.ignite.internal.metrics.MetricSet;
import org.apache.ignite.internal.pagememory.DataRegion;
import org.apache.ignite.internal.pagememory.TestDataRegion;
@@ -374,12 +374,7 @@ public abstract class AbstractPageReplacementTest extends
IgniteAbstractTest {
}
private void assertMetricValue(String metricName, Matcher<Long>
valueMatcher) {
- LongMetric metric = metricSet.get(metricName);
- assertThat(metric, is(notNullValue()));
- assertThat(
- metric.value(),
- valueMatcher
- );
+ assertThat(metricSet, hasMetric(metricName, hasValue(valueMatcher)));
}
private void createAndFillTestSimpleValuePage(long pageId) throws
Exception {
diff --git
a/modules/page-memory/src/test/java/org/apache/ignite/internal/pagememory/persistence/store/FilePageStoreIoTest.java
b/modules/page-memory/src/test/java/org/apache/ignite/internal/pagememory/persistence/store/FilePageStoreIoTest.java
index dcfcd8b8050..35b4c7de798 100644
---
a/modules/page-memory/src/test/java/org/apache/ignite/internal/pagememory/persistence/store/FilePageStoreIoTest.java
+++
b/modules/page-memory/src/test/java/org/apache/ignite/internal/pagememory/persistence/store/FilePageStoreIoTest.java
@@ -20,6 +20,9 @@ package
org.apache.ignite.internal.pagememory.persistence.store;
import static java.nio.file.StandardOpenOption.CREATE;
import static java.nio.file.StandardOpenOption.READ;
import static java.nio.file.StandardOpenOption.WRITE;
+import static
org.apache.ignite.internal.metrics.MetricMatchers.hasMeasurementsCount;
+import static org.apache.ignite.internal.metrics.MetricMatchers.hasMetric;
+import static org.apache.ignite.internal.metrics.MetricMatchers.hasValue;
import static org.apache.ignite.internal.pagememory.PageIdAllocator.FLAG_DATA;
import static
org.apache.ignite.internal.pagememory.persistence.store.FilePageStore.VERSION_1;
import static
org.apache.ignite.internal.pagememory.persistence.store.TestPageStoreUtils.createPageByteBuffer;
@@ -29,20 +32,16 @@ import static org.hamcrest.Matchers.is;
import static org.hamcrest.Matchers.startsWith;
import static org.junit.jupiter.api.Assertions.assertDoesNotThrow;
import static org.junit.jupiter.api.Assertions.assertEquals;
-import static org.junit.jupiter.api.Assertions.assertNotNull;
import static org.junit.jupiter.api.Assertions.assertThrows;
import java.io.IOException;
import java.nio.ByteBuffer;
import java.nio.file.Path;
-import java.util.Arrays;
import org.apache.ignite.internal.fileio.FileIo;
import org.apache.ignite.internal.fileio.FileIoFactory;
import org.apache.ignite.internal.fileio.MeteredFileIoFactory;
import org.apache.ignite.internal.fileio.RandomAccessFileIo;
import org.apache.ignite.internal.fileio.RandomAccessFileIoFactory;
-import org.apache.ignite.internal.metrics.DistributionMetric;
-import org.apache.ignite.internal.metrics.LongMetric;
import org.apache.ignite.internal.metrics.MetricSet;
import org.apache.ignite.internal.pagememory.metrics.CollectionMetricSource;
import org.apache.ignite.internal.pagememory.persistence.PageMemoryIoMetrics;
@@ -145,7 +144,7 @@ public class FilePageStoreIoTest extends
AbstractFilePageStoreIoTest {
// Verify write metrics were recorded - 1 write of header + 1
write of page
assertMetricValue(metricSet,
PageMemoryIoMetrics.TOTAL_BYTES_WRITTEN, PAGE_SIZE * 2);
- assertDistributionMetricRecordsCount(metricSet,
PageMemoryIoMetrics.WRITES_TIME, 2L);
+ assertDistributionMetricFromSet(metricSet,
PageMemoryIoMetrics.WRITES_TIME, 2L);
// Perform read operation
long pageOff = filePageStoreIo.pageOffset(pageId);
@@ -154,7 +153,7 @@ public class FilePageStoreIoTest extends
AbstractFilePageStoreIoTest {
// Verify read metrics were recorded
assertMetricValue(metricSet, PageMemoryIoMetrics.TOTAL_BYTES_READ,
PAGE_SIZE);
- assertDistributionMetricRecordsCount(metricSet,
PageMemoryIoMetrics.READS_TIME, 1L);
+ assertDistributionMetricFromSet(metricSet,
PageMemoryIoMetrics.READS_TIME, 1L);
}
}
@@ -176,28 +175,16 @@ public class FilePageStoreIoTest extends
AbstractFilePageStoreIoTest {
}
private static void assertMetricValue(MetricSet metrics, String
metricName, long value) {
- LongMetric metric = metrics.get(metricName);
-
- assertNotNull(metric, "Metric not found: " + metricName);
- assertEquals(value, metric.value(), metricName);
+ assertThat(metrics, hasMetric(
+ metricName,
+ hasValue(is(value))
+ ));
}
- /**
- * Verifies that the specified distribution metric has recorded the
expected total number of measurements.
- *
- * <p>
- * Rather than checking individual histogram buckets, this method
aggregates all recorded measurements across every bucket
- * and confirms that the expected interaction was captured in at least one
of them.
- */
- private static void assertDistributionMetricRecordsCount(MetricSet
metrics, String metricName, long expectedMeasuresCount) {
- DistributionMetric metric = metrics.get(metricName);
- assertNotNull(metric, metricName);
-
- long totalMeasuresCount = Arrays.stream(metric.value()).sum();
- assertThat(
- "Unexpected total measures count in distribution metric " +
metric.name(),
- totalMeasuresCount,
- is(expectedMeasuresCount)
- );
+ private static void assertDistributionMetricFromSet(MetricSet metrics,
String metricName, long expectedMeasuresCount) {
+ assertThat(metrics, hasMetric(
+ metricName,
+ hasMeasurementsCount(expectedMeasuresCount)
+ ));
}
}
diff --git
a/modules/storage-page-memory/src/main/java/org/apache/ignite/internal/storage/pagememory/PersistentPageMemoryStorageEngine.java
b/modules/storage-page-memory/src/main/java/org/apache/ignite/internal/storage/pagememory/PersistentPageMemoryStorageEngine.java
index 88431a0cda9..15dd79642d7 100644
---
a/modules/storage-page-memory/src/main/java/org/apache/ignite/internal/storage/pagememory/PersistentPageMemoryStorageEngine.java
+++
b/modules/storage-page-memory/src/main/java/org/apache/ignite/internal/storage/pagememory/PersistentPageMemoryStorageEngine.java
@@ -64,6 +64,7 @@ import
org.apache.ignite.internal.storage.pagememory.configuration.schema.Persis
import
org.apache.ignite.internal.storage.pagememory.configuration.schema.PersistentPageMemoryProfileView;
import
org.apache.ignite.internal.storage.pagememory.configuration.schema.PersistentPageMemoryStorageEngineConfiguration;
import
org.apache.ignite.internal.storage.pagememory.configuration.schema.PersistentPageMemoryStorageEngineExtensionConfiguration;
+import org.apache.ignite.internal.storage.pagememory.mv.RunConsistentlyMetrics;
import org.apache.ignite.internal.thread.IgniteThreadFactory;
import org.jetbrains.annotations.Nullable;
@@ -99,7 +100,7 @@ public class PersistentPageMemoryStorageEngine extends
AbstractPageMemoryStorage
private CollectionMetricSource checkpointMetricSource;
- private PersistentPageMemoryStorageMetricSource storageMetricSource;
+ private CollectionMetricSource storageMetricSource;
private final StorageConfiguration storageConfig;
@@ -130,6 +131,8 @@ public class PersistentPageMemoryStorageEngine extends
AbstractPageMemoryStorage
/** For unspecified tasks, i.e. throttling log. */
private final ExecutorService commonExecutorService;
+ private RunConsistentlyMetrics runConsistentlyMetrics;
+
/**
* Constructor.
*
@@ -253,10 +256,12 @@ public class PersistentPageMemoryStorageEngine extends
AbstractPageMemoryStorage
destructionExecutor = executor;
- storageMetricSource = new
PersistentPageMemoryStorageMetricSource("storage." + ENGINE_NAME);
+ storageMetricSource = new CollectionMetricSource("storage." +
ENGINE_NAME, "storage", null);
PersistentPageMemoryStorageMetrics.initMetrics(storageMetricSource,
filePageStoreManager);
+ runConsistentlyMetrics = new
RunConsistentlyMetrics(storageMetricSource);
+
metricManager.registerSource(checkpointMetricSource);
metricManager.registerSource(storageMetricSource);
metricManager.registerSource(ioMetricSource);
@@ -332,7 +337,8 @@ public class PersistentPageMemoryStorageEngine extends
AbstractPageMemoryStorage
this,
dataRegion,
destructionExecutor,
- failureManager
+ failureManager,
+ runConsistentlyMetrics
);
dataRegion.addTableStorage(tableStorage);
diff --git
a/modules/storage-page-memory/src/main/java/org/apache/ignite/internal/storage/pagememory/PersistentPageMemoryStorageMetricSource.java
b/modules/storage-page-memory/src/main/java/org/apache/ignite/internal/storage/pagememory/PersistentPageMemoryStorageMetricSource.java
deleted file mode 100644
index 0d813d0af98..00000000000
---
a/modules/storage-page-memory/src/main/java/org/apache/ignite/internal/storage/pagememory/PersistentPageMemoryStorageMetricSource.java
+++ /dev/null
@@ -1,85 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.ignite.internal.storage.pagememory;
-
-import java.util.HashMap;
-import java.util.Map;
-import org.apache.ignite.internal.metrics.Metric;
-import org.apache.ignite.internal.metrics.MetricSet;
-import org.apache.ignite.internal.metrics.MetricSource;
-import org.jetbrains.annotations.Nullable;
-
-/** Persistent page memory storage metric source. */
-class PersistentPageMemoryStorageMetricSource implements MetricSource {
- private final String name;
-
- /** Metrics map. Only modified in {@code synchronized} context. */
- private final Map<String, Metric> metrics = new HashMap<>();
-
- /** Enabled flag. Only modified in {@code synchronized} context. */
- private boolean enabled;
-
- /**
- * Constructor.
- *
- * @param name Metric set name.
- */
- PersistentPageMemoryStorageMetricSource(String name) {
- this.name = name;
- }
-
- @Override
- public String name() {
- return name;
- }
-
- @Override
- public @Nullable String group() {
- return "storage";
- }
-
- /** Adds metric to the source. */
- synchronized <T extends Metric> T addMetric(T metric) {
- assert !enabled : "Cannot add metrics when source is enabled";
-
- metrics.put(metric.name(), metric);
-
- return metric;
- }
-
- @Override
- public synchronized @Nullable MetricSet enable() {
- if (enabled) {
- return null;
- }
-
- enabled = true;
-
- return new MetricSet(name, description(), group(),
Map.copyOf(metrics));
- }
-
- @Override
- public synchronized void disable() {
- enabled = false;
- }
-
- @Override
- public synchronized boolean enabled() {
- return enabled;
- }
-}
diff --git
a/modules/storage-page-memory/src/main/java/org/apache/ignite/internal/storage/pagememory/PersistentPageMemoryStorageMetrics.java
b/modules/storage-page-memory/src/main/java/org/apache/ignite/internal/storage/pagememory/PersistentPageMemoryStorageMetrics.java
index 13a6bfbe31a..6a8d847b476 100644
---
a/modules/storage-page-memory/src/main/java/org/apache/ignite/internal/storage/pagememory/PersistentPageMemoryStorageMetrics.java
+++
b/modules/storage-page-memory/src/main/java/org/apache/ignite/internal/storage/pagememory/PersistentPageMemoryStorageMetrics.java
@@ -21,6 +21,7 @@ import
org.apache.ignite.internal.lang.IgniteInternalCheckedException;
import org.apache.ignite.internal.logger.IgniteLogger;
import org.apache.ignite.internal.logger.Loggers;
import org.apache.ignite.internal.metrics.LongGauge;
+import org.apache.ignite.internal.pagememory.metrics.CollectionMetricSource;
import org.apache.ignite.internal.pagememory.persistence.store.FilePageStore;
import
org.apache.ignite.internal.pagememory.persistence.store.FilePageStoreManager;
import
org.apache.ignite.internal.pagememory.persistence.store.GroupPageStoresMap.GroupPartitionPageStore;
@@ -31,7 +32,7 @@ class PersistentPageMemoryStorageMetrics {
/** Initializes metrics in the given metric source. */
static void initMetrics(
- PersistentPageMemoryStorageMetricSource source,
+ CollectionMetricSource source,
FilePageStoreManager filePageStoreManager
) {
source.addMetric(new LongGauge(
diff --git
a/modules/storage-page-memory/src/main/java/org/apache/ignite/internal/storage/pagememory/PersistentPageMemoryTableStorage.java
b/modules/storage-page-memory/src/main/java/org/apache/ignite/internal/storage/pagememory/PersistentPageMemoryTableStorage.java
index 7385839d8c7..426aecca791 100644
---
a/modules/storage-page-memory/src/main/java/org/apache/ignite/internal/storage/pagememory/PersistentPageMemoryTableStorage.java
+++
b/modules/storage-page-memory/src/main/java/org/apache/ignite/internal/storage/pagememory/PersistentPageMemoryTableStorage.java
@@ -47,6 +47,7 @@ import
org.apache.ignite.internal.storage.index.StorageIndexDescriptorSupplier;
import org.apache.ignite.internal.storage.pagememory.index.meta.IndexMetaTree;
import
org.apache.ignite.internal.storage.pagememory.mv.AbstractPageMemoryMvPartitionStorage;
import
org.apache.ignite.internal.storage.pagememory.mv.PersistentPageMemoryMvPartitionStorage;
+import org.apache.ignite.internal.storage.pagememory.mv.RunConsistentlyMetrics;
import org.apache.ignite.internal.storage.pagememory.mv.VersionChainTree;
import org.apache.ignite.internal.storage.pagememory.mv.gc.GcQueue;
import org.jetbrains.annotations.Nullable;
@@ -65,6 +66,8 @@ public class PersistentPageMemoryTableStorage extends
AbstractPageMemoryTableSto
private final FailureProcessor failureProcessor;
+ private final RunConsistentlyMetrics runConsistentlyMetrics;
+
/**
* Constructor.
*
@@ -72,7 +75,9 @@ public class PersistentPageMemoryTableStorage extends
AbstractPageMemoryTableSto
* @param indexDescriptorSupplier Index descriptor supplier.
* @param engine Storage engine instance.
* @param dataRegion Data region for the table.
+ * @param destructionExecutor Executor service for destruction tasks.
* @param failureProcessor Failure processor.
+ * @param runConsistentlyMetrics RunConsistently metrics.
*/
public PersistentPageMemoryTableStorage(
StorageTableDescriptor tableDescriptor,
@@ -80,7 +85,8 @@ public class PersistentPageMemoryTableStorage extends
AbstractPageMemoryTableSto
PersistentPageMemoryStorageEngine engine,
PersistentPageMemoryDataRegion dataRegion,
ExecutorService destructionExecutor,
- FailureProcessor failureProcessor
+ FailureProcessor failureProcessor,
+ RunConsistentlyMetrics runConsistentlyMetrics
) {
super(tableDescriptor, indexDescriptorSupplier);
@@ -88,6 +94,7 @@ public class PersistentPageMemoryTableStorage extends
AbstractPageMemoryTableSto
this.dataRegion = dataRegion;
this.destructionExecutor = destructionExecutor;
this.failureProcessor = failureProcessor;
+ this.runConsistentlyMetrics = runConsistentlyMetrics;
}
@Override
@@ -144,7 +151,8 @@ public class PersistentPageMemoryTableStorage extends
AbstractPageMemoryTableSto
indexMetaTree,
gcQueue,
destructionExecutor,
- failureProcessor
+ failureProcessor,
+ runConsistentlyMetrics
);
});
}
diff --git
a/modules/storage-page-memory/src/main/java/org/apache/ignite/internal/storage/pagememory/mv/PersistentPageMemoryMvPartitionStorage.java
b/modules/storage-page-memory/src/main/java/org/apache/ignite/internal/storage/pagememory/mv/PersistentPageMemoryMvPartitionStorage.java
index 8e46826f286..8b10f12744b 100644
---
a/modules/storage-page-memory/src/main/java/org/apache/ignite/internal/storage/pagememory/mv/PersistentPageMemoryMvPartitionStorage.java
+++
b/modules/storage-page-memory/src/main/java/org/apache/ignite/internal/storage/pagememory/mv/PersistentPageMemoryMvPartitionStorage.java
@@ -63,6 +63,7 @@ import org.apache.ignite.internal.storage.util.LocalLocker;
import org.apache.ignite.internal.util.ByteUtils;
import org.apache.ignite.internal.util.Cursor;
import org.jetbrains.annotations.Nullable;
+import org.jetbrains.annotations.TestOnly;
/**
* Implementation of {@link MvPartitionStorage} based on a {@link BplusTree}
for persistent case.
@@ -99,6 +100,9 @@ public class PersistentPageMemoryMvPartitionStorage extends
AbstractPageMemoryMv
*/
private final Object leaseInfoLock = new Object();
+ /** RunConsistently metrics. */
+ private final RunConsistentlyMetrics runConsistentlyMetrics;
+
/**
* Constructor.
*
@@ -110,6 +114,7 @@ public class PersistentPageMemoryMvPartitionStorage extends
AbstractPageMemoryMv
* @param indexMetaTree Tree that contains SQL indexes' metadata.
* @param gcQueue Garbage collection queue.
* @param failureProcessor Failure processor.
+ * @param runConsistentlyMetrics Metric source for runConsistently
operations.
*/
public PersistentPageMemoryMvPartitionStorage(
PersistentPageMemoryTableStorage tableStorage,
@@ -120,7 +125,8 @@ public class PersistentPageMemoryMvPartitionStorage extends
AbstractPageMemoryMv
IndexMetaTree indexMetaTree,
GcQueue gcQueue,
ExecutorService destructionExecutor,
- FailureProcessor failureProcessor
+ FailureProcessor failureProcessor,
+ RunConsistentlyMetrics runConsistentlyMetrics
) {
super(
partitionId,
@@ -166,6 +172,8 @@ public class PersistentPageMemoryMvPartitionStorage extends
AbstractPageMemoryMv
);
leaseInfo = leaseInfoFromMeta();
+
+ this.runConsistentlyMetrics = runConsistentlyMetrics;
}
/**
@@ -193,26 +201,44 @@ public class PersistentPageMemoryMvPartitionStorage
extends AbstractPageMemoryMv
return busy(() -> {
throwExceptionIfStorageNotInRunnableOrRebalanceState(state.get(),
this::createStorageInfo);
- LocalLocker locker0 = new PersistentPageMemoryLocker();
-
- checkpointTimeoutLock.checkpointReadLock();
+ boolean metricsEnabled = runConsistentlyMetrics.enabled();
+ long startTime = metricsEnabled ? System.nanoTime() : 0;
- THREAD_LOCAL_LOCKER.set(locker0);
+ if (metricsEnabled) {
+ runConsistentlyMetrics.onRunConsistentlyStarted();
+ }
try {
- return closure.execute(locker0);
+ return executeRunConsistently(closure);
} finally {
- THREAD_LOCAL_LOCKER.set(null);
-
- // Can't throw any exception, it's safe to do it without
try/finally.
- locker0.unlockAll();
-
- checkpointTimeoutLock.checkpointReadUnlock();
+ if (metricsEnabled) {
+
runConsistentlyMetrics.recordRunConsistentlyDuration(System.nanoTime() -
startTime);
+ runConsistentlyMetrics.onRunConsistentlyFinished();
+ }
}
});
}
}
+ private <V> V executeRunConsistently(WriteClosure<V> closure) {
+ LocalLocker locker0 = new PersistentPageMemoryLocker();
+
+ checkpointTimeoutLock.checkpointReadLock();
+
+ THREAD_LOCAL_LOCKER.set(locker0);
+
+ try {
+ return closure.execute(locker0);
+ } finally {
+ THREAD_LOCAL_LOCKER.set(null);
+
+ // Can't throw any exception, it's safe to do it without
try/finally.
+ locker0.unlockAll();
+
+ checkpointTimeoutLock.checkpointReadUnlock();
+ }
+ }
+
@Override
public CompletableFuture<Void> flush(boolean trigger) {
return busy(() -> {
@@ -437,6 +463,12 @@ public class PersistentPageMemoryMvPartitionStorage
extends AbstractPageMemoryMv
return wiHeadLock.isHeldByCurrentThread();
}
+ /** Returns the runConsistently metrics for testing. */
+ @TestOnly
+ RunConsistentlyMetrics runConsistentlyMetrics() {
+ return runConsistentlyMetrics;
+ }
+
@Override
public @Nullable LeaseInfo leaseInfo() {
return busy(() -> {
diff --git
a/modules/storage-page-memory/src/main/java/org/apache/ignite/internal/storage/pagememory/mv/RunConsistentlyMetrics.java
b/modules/storage-page-memory/src/main/java/org/apache/ignite/internal/storage/pagememory/mv/RunConsistentlyMetrics.java
new file mode 100644
index 00000000000..6cc8fce77f6
--- /dev/null
+++
b/modules/storage-page-memory/src/main/java/org/apache/ignite/internal/storage/pagememory/mv/RunConsistentlyMetrics.java
@@ -0,0 +1,125 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.storage.pagememory.mv;
+
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.LongAdder;
+import org.apache.ignite.internal.metrics.DistributionMetric;
+import org.apache.ignite.internal.metrics.LongAdderMetric;
+import org.apache.ignite.internal.metrics.LongGauge;
+import org.apache.ignite.internal.metrics.LongMetric;
+import org.apache.ignite.internal.pagememory.metrics.CollectionMetricSource;
+import org.jetbrains.annotations.TestOnly;
+
+/**
+ * Metrics for runConsistently operation.
+ *
+ * <p>Tracks runConsistently closure execution performance including duration,
+ * active call count, and total invocation count.
+ */
+public class RunConsistentlyMetrics {
+ /** Histogram bucket bounds for runConsistently duration in nanoseconds. */
+ private static final long[] RUN_CONSISTENTLY_DURATION_BOUNDS = {
+ TimeUnit.MICROSECONDS.toNanos(10),
+ TimeUnit.MICROSECONDS.toNanos(100),
+ TimeUnit.MILLISECONDS.toNanos(1),
+ TimeUnit.MILLISECONDS.toNanos(10),
+ TimeUnit.MILLISECONDS.toNanos(100),
+ TimeUnit.SECONDS.toNanos(1),
+ TimeUnit.SECONDS.toNanos(10),
+ };
+
+ private final DistributionMetric runConsistentlyDuration;
+ private final LongAdderMetric runConsistentlyStarted;
+ private final LongAdder exitCount = new LongAdder();
+ private final LongGauge runConsistentlyActiveCount;
+
+ private final CollectionMetricSource metricSource;
+
+ /**
+ * Constructor.
+ *
+ * @param metricSource Metric source to register metrics with.
+ */
+ public RunConsistentlyMetrics(CollectionMetricSource metricSource) {
+ this.metricSource = metricSource;
+
+ runConsistentlyDuration = metricSource.addMetric(new
DistributionMetric(
+ "RunConsistentlyDuration",
+ "Time spent in runConsistently closures in nanoseconds.",
+ RUN_CONSISTENTLY_DURATION_BOUNDS
+ ));
+
+ runConsistentlyStarted = metricSource.addMetric(new LongAdderMetric(
+ "RunConsistentlyStarted",
+ "Total number of runConsistently invocations started."
+ ));
+
+ runConsistentlyActiveCount = metricSource.addMetric(new LongGauge(
+ "RunConsistentlyActiveCount",
+ "Current number of active runConsistently calls.",
+ () -> runConsistentlyStarted.value() - exitCount.sum()
+ ));
+ }
+
+ /**
+ * Returns {@code true} if the metric source is enabled.
+ */
+ public boolean enabled() {
+ return metricSource.enabled();
+ }
+
+ /**
+ * Records the duration of a runConsistently closure execution in
nanoseconds.
+ */
+ public void recordRunConsistentlyDuration(long durationNanos) {
+ runConsistentlyDuration.add(durationNanos);
+ }
+
+ /**
+ * Records the start of a runConsistently invocation.
+ */
+ public void onRunConsistentlyStarted() {
+ runConsistentlyStarted.increment();
+ }
+
+ /**
+ * Records the completion of a runConsistently invocation.
+ */
+ public void onRunConsistentlyFinished() {
+ exitCount.increment();
+ }
+
+ /** Returns the runConsistently duration metric for testing. */
+ @TestOnly
+ DistributionMetric runConsistentlyDuration() {
+ return runConsistentlyDuration;
+ }
+
+ /** Returns the runConsistently started count metric for testing. */
+ @TestOnly
+ LongAdderMetric runConsistentlyStarted() {
+ return runConsistentlyStarted;
+ }
+
+ /** Returns the runConsistently active count metric for testing. */
+ @TestOnly
+ LongMetric runConsistentlyActiveCount() {
+ return runConsistentlyActiveCount;
+ }
+}
diff --git
a/modules/storage-page-memory/src/test/java/org/apache/ignite/internal/storage/pagememory/PersistentPageMemoryStorageMetricsTest.java
b/modules/storage-page-memory/src/test/java/org/apache/ignite/internal/storage/pagememory/PersistentPageMemoryStorageMetricsTest.java
index 434dce47e91..411c615f919 100644
---
a/modules/storage-page-memory/src/test/java/org/apache/ignite/internal/storage/pagememory/PersistentPageMemoryStorageMetricsTest.java
+++
b/modules/storage-page-memory/src/test/java/org/apache/ignite/internal/storage/pagememory/PersistentPageMemoryStorageMetricsTest.java
@@ -32,6 +32,7 @@ import org.apache.ignite.internal.metrics.Metric;
import org.apache.ignite.internal.metrics.MetricManager;
import org.apache.ignite.internal.metrics.MetricSet;
import org.apache.ignite.internal.metrics.TestMetricManager;
+import org.apache.ignite.internal.pagememory.metrics.CollectionMetricSource;
import org.apache.ignite.internal.pagememory.persistence.GroupPartitionId;
import org.apache.ignite.internal.pagememory.persistence.store.FilePageStore;
import
org.apache.ignite.internal.pagememory.persistence.store.FilePageStoreManager;
@@ -45,7 +46,7 @@ import org.junit.jupiter.api.Test;
public class PersistentPageMemoryStorageMetricsTest extends
BaseIgniteAbstractTest {
private final MetricManager metricManager = new TestMetricManager();
- private final PersistentPageMemoryStorageMetricSource metricSource = new
PersistentPageMemoryStorageMetricSource("test");
+ private final CollectionMetricSource metricSource = new
CollectionMetricSource("test", "storage", null);
private final FilePageStoreManager filePageStoreManager =
mock(FilePageStoreManager.class);
diff --git
a/modules/storage-page-memory/src/test/java/org/apache/ignite/internal/storage/pagememory/mv/FailedCheckpointTest.java
b/modules/storage-page-memory/src/test/java/org/apache/ignite/internal/storage/pagememory/mv/FailedCheckpointTest.java
index 9a6cbe78c49..9513fbe8266 100644
---
a/modules/storage-page-memory/src/test/java/org/apache/ignite/internal/storage/pagememory/mv/FailedCheckpointTest.java
+++
b/modules/storage-page-memory/src/test/java/org/apache/ignite/internal/storage/pagememory/mv/FailedCheckpointTest.java
@@ -232,13 +232,20 @@ public class FailedCheckpointTest extends
BaseMvStoragesTest {
}
private MvTableStorage createMvTableStorage() {
+ CollectionMetricSource metricSource = new CollectionMetricSource(
+ "storage.test.consistency",
+ "storage",
+ null
+ );
+
var tableStorage = new PersistentPageMemoryTableStorage(
new StorageTableDescriptor(1, DEFAULT_PARTITION_COUNT,
DEFAULT_STORAGE_PROFILE),
mock(StorageIndexDescriptorSupplier.class),
mockEngine,
dataRegion,
destructionExecutor,
- mock(FailureManager.class)
+ mock(FailureManager.class),
+ new RunConsistentlyMetrics(metricSource)
);
dataRegion.addTableStorage(tableStorage);
diff --git
a/modules/storage-page-memory/src/test/java/org/apache/ignite/internal/storage/pagememory/mv/PersistentPageMemoryMvPartitionStorageTest.java
b/modules/storage-page-memory/src/test/java/org/apache/ignite/internal/storage/pagememory/mv/PersistentPageMemoryMvPartitionStorageTest.java
index 03c1f2ffa1c..0cd71d91939 100644
---
a/modules/storage-page-memory/src/test/java/org/apache/ignite/internal/storage/pagememory/mv/PersistentPageMemoryMvPartitionStorageTest.java
+++
b/modules/storage-page-memory/src/test/java/org/apache/ignite/internal/storage/pagememory/mv/PersistentPageMemoryMvPartitionStorageTest.java
@@ -19,6 +19,8 @@ package org.apache.ignite.internal.storage.pagememory.mv;
import static
org.apache.ignite.internal.catalog.CatalogService.DEFAULT_STORAGE_PROFILE;
import static
org.apache.ignite.internal.catalog.commands.CatalogUtils.DEFAULT_PARTITION_COUNT;
+import static
org.apache.ignite.internal.metrics.MetricMatchers.hasMeasurementsCount;
+import static org.apache.ignite.internal.metrics.MetricMatchers.hasValue;
import static
org.apache.ignite.internal.pagememory.persistence.checkpoint.CheckpointState.FINISHED;
import static org.apache.ignite.internal.schema.BinaryRowMatcher.isRow;
import static
org.apache.ignite.internal.testframework.matchers.CompletableFutureMatcher.willCompleteSuccessfully;
@@ -45,7 +47,8 @@ import
org.apache.ignite.internal.configuration.SystemLocalConfiguration;
import
org.apache.ignite.internal.configuration.testframework.InjectConfiguration;
import org.apache.ignite.internal.failure.FailureManager;
import org.apache.ignite.internal.hlc.HybridTimestamp;
-import org.apache.ignite.internal.metrics.MetricManager;
+import org.apache.ignite.internal.metrics.LongMetric;
+import org.apache.ignite.internal.metrics.TestMetricManager;
import org.apache.ignite.internal.pagememory.io.PageIoRegistry;
import org.apache.ignite.internal.storage.RowId;
import org.apache.ignite.internal.storage.StorageClosedException;
@@ -91,7 +94,7 @@ class PersistentPageMemoryMvPartitionStorageTest extends
AbstractPageMemoryMvPar
engine = new PersistentPageMemoryStorageEngine(
"test",
- mock(MetricManager.class),
+ new TestMetricManager(),
storageConfig,
systemConfig,
ioRegistry,
@@ -537,4 +540,71 @@ class PersistentPageMemoryMvPartitionStorageTest extends
AbstractPageMemoryMvPar
assertThrows(StorageClosedException.class, cursor::next);
}
}
+
+ @Test
+ void verifyRunConsistentlyMetrics() {
+ RunConsistentlyMetrics metrics =
((PersistentPageMemoryMvPartitionStorage) storage).runConsistentlyMetrics();
+
+ // Verify metrics start at zero
+ assertThat(metrics.runConsistentlyDuration(),
hasMeasurementsCount(0L));
+ assertMetricValue(metrics.runConsistentlyActiveCount(), 0);
+ assertMetricValue(metrics.runConsistentlyStarted(), 0);
+
+ // Execute a simple operation within runConsistently
+ storage.runConsistently(locker -> {
+ // Verify active count is incremented during execution
+ assertMetricValue(metrics.runConsistentlyActiveCount(), 1);
+ assertMetricValue(metrics.runConsistentlyStarted(), 1);
+
+ return null;
+ });
+
+ // Verify duration was recorded
+ assertThat(metrics.runConsistentlyDuration(),
hasMeasurementsCount(1L));
+
+ // Verify active count is back to zero
+ assertMetricValue(metrics.runConsistentlyActiveCount(), 0);
+ assertMetricValue(metrics.runConsistentlyStarted(), 1);
+
+ // Execute another operation
+ storage.runConsistently(locker -> {
+ assertMetricValue(metrics.runConsistentlyActiveCount(), 1);
+
+ return null;
+ });
+
+ // Verify counters after second invocation
+ assertThat(metrics.runConsistentlyDuration(),
hasMeasurementsCount(2L));
+ assertMetricValue(metrics.runConsistentlyActiveCount(), 0);
+ assertMetricValue(metrics.runConsistentlyStarted(), 2);
+ }
+
+ @Test
+ void verifyNestedRunConsistentlyDoesNotDoubleCountMetrics() {
+ RunConsistentlyMetrics metrics =
((PersistentPageMemoryMvPartitionStorage) storage).runConsistentlyMetrics();
+
+ storage.runConsistently(outerLocker -> {
+ assertMetricValue(metrics.runConsistentlyActiveCount(), 1);
+ assertMetricValue(metrics.runConsistentlyStarted(), 1);
+
+ // Nested call - takes fast path, no metrics recorded
+ storage.runConsistently(innerLocker -> {
+ assertMetricValue(metrics.runConsistentlyActiveCount(), 1);
+ assertMetricValue(metrics.runConsistentlyStarted(), 1);
+
+ return null;
+ });
+
+ return null;
+ });
+
+ // Only one entry should be recorded for the outer call
+ assertThat(metrics.runConsistentlyDuration(),
hasMeasurementsCount(1L));
+ assertMetricValue(metrics.runConsistentlyActiveCount(), 0);
+ assertMetricValue(metrics.runConsistentlyStarted(), 1);
+ }
+
+ private static void assertMetricValue(LongMetric metric, long value) {
+ assertThat(metric, hasValue(is(value)));
+ }
}