This is an automated email from the ASF dual-hosted git repository.

sanpwc pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/ignite-3.git


The following commit(s) were added to refs/heads/main by this push:
     new 26516f65841 IGNITE-26122 Add metric for clock drift (#6440)
26516f65841 is described below

commit 26516f65841bebd7810be7f30eb1e6d0e1c76484
Author: Alexander Lapin <[email protected]>
AuthorDate: Tue Aug 19 15:57:45 2025 +0300

    IGNITE-26122 Add metric for clock drift (#6440)
---
 .../ignite/internal/cli/CliIntegrationTest.java    |  3 +-
 .../ignite/internal/hlc/ClockServiceImpl.java      | 17 ++++-
 .../ignite/internal/hlc/ClockServiceTest.java      |  7 +-
 .../rebalance/ItRebalanceDistributedTest.java      |  3 +-
 .../impl/ItIdempotentCommandCacheTest.java         |  3 +-
 .../metrics/sources/ClockServiceMetricSource.java  | 74 +++++++++++++++++++
 .../sources/ClockServiceMetricSourceTest.java      | 85 ++++++++++++++++++++++
 .../partition/replicator/fixtures/Node.java        |  3 +-
 .../rest/metrics/ItMetricControllerTest.java       |  3 +-
 .../runner/app/ItIgniteNodeRestartTest.java        |  3 +-
 .../org/apache/ignite/internal/app/IgniteImpl.java | 15 +++-
 .../storage/InternalTableEstimatedSizeTest.java    |  7 +-
 12 files changed, 211 insertions(+), 12 deletions(-)

diff --git 
a/modules/cli/src/integrationTest/java/org/apache/ignite/internal/cli/CliIntegrationTest.java
 
b/modules/cli/src/integrationTest/java/org/apache/ignite/internal/cli/CliIntegrationTest.java
index 0dc2de7ca31..87d076a43dc 100644
--- 
a/modules/cli/src/integrationTest/java/org/apache/ignite/internal/cli/CliIntegrationTest.java
+++ 
b/modules/cli/src/integrationTest/java/org/apache/ignite/internal/cli/CliIntegrationTest.java
@@ -79,7 +79,8 @@ public abstract class CliIntegrationTest extends 
ClusterPerClassIntegrationTest
             new MetricSource().name("transactions").enabled(true),
             new MetricSource().name("placement-driver").enabled(true),
             new MetricSource().name("resource.vacuum").enabled(true),
-            new MetricSource().name("zones.Default").enabled(true)
+            new MetricSource().name("zones.Default").enabled(true),
+            new MetricSource().name("clock.service").enabled(true)
     };
 
     /** Correct ignite jdbc url. */
diff --git 
a/modules/core/src/main/java/org/apache/ignite/internal/hlc/ClockServiceImpl.java
 
b/modules/core/src/main/java/org/apache/ignite/internal/hlc/ClockServiceImpl.java
index e486f3afa6b..e96271dbf80 100644
--- 
a/modules/core/src/main/java/org/apache/ignite/internal/hlc/ClockServiceImpl.java
+++ 
b/modules/core/src/main/java/org/apache/ignite/internal/hlc/ClockServiceImpl.java
@@ -18,6 +18,7 @@
 package org.apache.ignite.internal.hlc;
 
 import java.util.concurrent.CompletableFuture;
+import java.util.function.Consumer;
 import java.util.function.LongSupplier;
 import org.apache.ignite.internal.logger.IgniteLogger;
 import org.apache.ignite.internal.logger.Loggers;
@@ -30,18 +31,24 @@ public class ClockServiceImpl implements ClockService {
 
     private final HybridClock clock;
     private final ClockWaiter clockWaiter;
-
     private final LongSupplier maxClockSkewMsSupplier;
+    private final Consumer<Long> onMaxClockSkewExceededClosure;
 
     private volatile long maxClockSkewMillis = -1;
 
     /**
      * Constructor.
      */
-    public ClockServiceImpl(HybridClock clock, ClockWaiter clockWaiter, 
LongSupplier maxClockSkewMsSupplier) {
+    public ClockServiceImpl(
+            HybridClock clock,
+            ClockWaiter clockWaiter,
+            LongSupplier maxClockSkewMsSupplier,
+            Consumer<Long> onMaxClockSkewExceededClosure
+    ) {
         this.clock = clock;
         this.clockWaiter = clockWaiter;
         this.maxClockSkewMsSupplier = maxClockSkewMsSupplier;
+        this.onMaxClockSkewExceededClosure = onMaxClockSkewExceededClosure;
     }
 
     @Override
@@ -70,9 +77,13 @@ public class ClockServiceImpl implements ClockService {
         // However, since benchmarks did not show any noticeable performance 
penalty due to the aforementioned call duplication,
         // design purity was prioritized over call redundancy.
         HybridTimestamp currentLocalTimestamp = clock.current();
-        if (requestTime.getPhysical() - maxClockSkewMillis() > 
currentLocalTimestamp.getPhysical()) {
+        long requestTimePhysical = requestTime.getPhysical();
+        long currentLocalTimePhysical = currentLocalTimestamp.getPhysical();
+
+        if (requestTimePhysical - maxClockSkewMillis() > 
currentLocalTimePhysical) {
             log.warn("Maximum allowed clock drift exceeded [requestTime={}, 
localTime={}, maxClockSkew={}]", requestTime,
                     currentLocalTimestamp, maxClockSkewMillis());
+            onMaxClockSkewExceededClosure.accept(requestTimePhysical - 
currentLocalTimePhysical);
         }
 
         return clock.update(requestTime);
diff --git 
a/modules/core/src/test/java/org/apache/ignite/internal/hlc/ClockServiceTest.java
 
b/modules/core/src/test/java/org/apache/ignite/internal/hlc/ClockServiceTest.java
index c7ccc356bcd..47fe4925f98 100644
--- 
a/modules/core/src/test/java/org/apache/ignite/internal/hlc/ClockServiceTest.java
+++ 
b/modules/core/src/test/java/org/apache/ignite/internal/hlc/ClockServiceTest.java
@@ -51,7 +51,12 @@ public class ClockServiceTest extends BaseIgniteAbstractTest 
{
     @Test
     public void testMaximumAllowedClockDriftExceededIsPrinted() {
         HybridClock clock = new HybridClockImpl();
-        ClockServiceImpl clockService = new ClockServiceImpl(clock, 
clockWaiter, () -> MAX_CLOCK_SKEW_MILLIS);
+        ClockServiceImpl clockService = new ClockServiceImpl(
+                clock,
+                clockWaiter,
+                () -> MAX_CLOCK_SKEW_MILLIS,
+                skew -> {}
+        );
 
         // Check that request time less than max clock skew won't trigger log 
warning.
         
clockService.updateClock(clock.current().addPhysicalTime(MAX_CLOCK_SKEW_MILLIS 
/ 2));
diff --git 
a/modules/distribution-zones/src/integrationTest/java/org/apache/ignite/internal/rebalance/ItRebalanceDistributedTest.java
 
b/modules/distribution-zones/src/integrationTest/java/org/apache/ignite/internal/rebalance/ItRebalanceDistributedTest.java
index 0a98d8f1aca..b2a4b9b3afd 100644
--- 
a/modules/distribution-zones/src/integrationTest/java/org/apache/ignite/internal/rebalance/ItRebalanceDistributedTest.java
+++ 
b/modules/distribution-zones/src/integrationTest/java/org/apache/ignite/internal/rebalance/ItRebalanceDistributedTest.java
@@ -1426,7 +1426,8 @@ public class ItRebalanceDistributedTest extends 
BaseIgniteAbstractTest {
             ClockService clockService = new ClockServiceImpl(
                     hybridClock,
                     clockWaiter,
-                    () -> DEFAULT_MAX_CLOCK_SKEW_MS
+                    () -> DEFAULT_MAX_CLOCK_SKEW_MS,
+                    skew -> {}
             );
 
             ReplicaService replicaSvc = new ReplicaService(
diff --git 
a/modules/metastorage/src/integrationTest/java/org/apache/ignite/internal/metastorage/impl/ItIdempotentCommandCacheTest.java
 
b/modules/metastorage/src/integrationTest/java/org/apache/ignite/internal/metastorage/impl/ItIdempotentCommandCacheTest.java
index 66544e739cc..b4b37fbf26a 100644
--- 
a/modules/metastorage/src/integrationTest/java/org/apache/ignite/internal/metastorage/impl/ItIdempotentCommandCacheTest.java
+++ 
b/modules/metastorage/src/integrationTest/java/org/apache/ignite/internal/metastorage/impl/ItIdempotentCommandCacheTest.java
@@ -245,7 +245,8 @@ public class ItIdempotentCommandCacheTest extends 
IgniteAbstractTest {
             clockService = new ClockServiceImpl(
                     clock,
                     clockWaiter,
-                    () -> TEST_MAX_CLOCK_SKEW_MILLIS
+                    () -> TEST_MAX_CLOCK_SKEW_MILLIS,
+                    skew -> {}
             );
         }
 
diff --git 
a/modules/metrics/src/main/java/org/apache/ignite/internal/metrics/sources/ClockServiceMetricSource.java
 
b/modules/metrics/src/main/java/org/apache/ignite/internal/metrics/sources/ClockServiceMetricSource.java
new file mode 100644
index 00000000000..edfb9893f37
--- /dev/null
+++ 
b/modules/metrics/src/main/java/org/apache/ignite/internal/metrics/sources/ClockServiceMetricSource.java
@@ -0,0 +1,74 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.metrics.sources;
+
+import java.util.List;
+import org.apache.ignite.internal.metrics.AbstractMetricSource;
+import org.apache.ignite.internal.metrics.DistributionMetric;
+import org.apache.ignite.internal.metrics.Metric;
+import 
org.apache.ignite.internal.metrics.sources.ClockServiceMetricSource.Holder;
+
+/**
+ * Metric source, which provides clock service metrics.
+ */
+public class ClockServiceMetricSource extends AbstractMetricSource<Holder> {
+    /** Source name. */
+    public static final String SOURCE_NAME = "clock.service";
+
+    /** Histogram buckets for clock skew in milliseconds. */
+    private static final long[] HISTOGRAM_BUCKETS =
+            {1, 2, 4, 8, 16, 25, 50, 75, 100, 250, 500, 750, 1000, 3000, 5000, 
10000, 25000, 60000};
+
+    /**
+     * Creates a new instance of {@link ClockServiceMetricSource}.
+     */
+    public ClockServiceMetricSource() {
+        super(SOURCE_NAME, "Clock service metrics.");
+    }
+
+    @Override
+    protected Holder createHolder() {
+        return new Holder();
+    }
+
+    /**
+     * Closure to be called when clock skew exceeded max clock skew.
+     *
+     * @param observedClockSkew Observed max clock skew.
+     */
+    public void onMaxClockSkewExceeded(long observedClockSkew) {
+        Holder holder = holder();
+
+        if (holder != null) {
+            holder.clockSkewExceedingMaxClockSkew.add(observedClockSkew);
+        }
+    }
+
+    /** Holder class. */
+    protected static class Holder implements 
AbstractMetricSource.Holder<Holder> {
+        private final DistributionMetric clockSkewExceedingMaxClockSkew = new 
DistributionMetric(
+                "ClockSkewExceedingMaxClockSkew",
+                "Observed clock skew that exceeded max clock skew.",
+                HISTOGRAM_BUCKETS);
+
+        @Override
+        public Iterable<Metric> metrics() {
+            return List.of(clockSkewExceedingMaxClockSkew);
+        }
+    }
+}
diff --git 
a/modules/metrics/src/test/java/org/apache/ignite/internal/metrics/sources/ClockServiceMetricSourceTest.java
 
b/modules/metrics/src/test/java/org/apache/ignite/internal/metrics/sources/ClockServiceMetricSourceTest.java
new file mode 100644
index 00000000000..9f1512f1902
--- /dev/null
+++ 
b/modules/metrics/src/test/java/org/apache/ignite/internal/metrics/sources/ClockServiceMetricSourceTest.java
@@ -0,0 +1,85 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.metrics.sources;
+
+import static org.junit.jupiter.api.Assertions.assertEquals;
+import static org.junit.jupiter.api.Assertions.assertNotNull;
+
+import org.apache.ignite.internal.hlc.ClockServiceImpl;
+import org.apache.ignite.internal.hlc.ClockWaiter;
+import org.apache.ignite.internal.hlc.HybridClock;
+import org.apache.ignite.internal.hlc.HybridClockImpl;
+import org.apache.ignite.internal.metrics.DistributionMetric;
+import org.apache.ignite.internal.metrics.MetricRegistry;
+import org.apache.ignite.internal.metrics.MetricSet;
+import org.apache.ignite.internal.testframework.BaseIgniteAbstractTest;
+import org.junit.jupiter.api.Test;
+import org.mockito.Mock;
+
+/**
+ * Test for clock service metric source.
+ */
+public class ClockServiceMetricSourceTest extends BaseIgniteAbstractTest {
+    private static final long MAX_CLOCK_SKEW_MILLIS = 100;
+    @Mock
+    private ClockWaiter clockWaiter;
+
+    @Test
+    public void test() throws Exception {
+        MetricRegistry registry = new MetricRegistry();
+        HybridClock clock = new HybridClockImpl();
+        ClockServiceMetricSource clockServiceMetricSource = new 
ClockServiceMetricSource();
+
+        ClockServiceImpl clockService = new ClockServiceImpl(
+                clock,
+                clockWaiter,
+                () -> MAX_CLOCK_SKEW_MILLIS,
+                clockServiceMetricSource::onMaxClockSkewExceeded
+        );
+        registry.registerSource(clockServiceMetricSource);
+
+        MetricSet metricSet = registry.enable(clockServiceMetricSource);
+        assertNotNull(metricSet);
+
+        DistributionMetric clockSkewExceedingMaxClockSkew = 
metricSet.get("ClockSkewExceedingMaxClockSkew");
+        assertNotNull(clockSkewExceedingMaxClockSkew);
+
+        for (int i = 0; i < clockSkewExceedingMaxClockSkew.value().length; 
i++) {
+            assertEquals(0L, clockSkewExceedingMaxClockSkew.value()[i]);
+        }
+
+        // Less than max clock skew, should not upgrade the metric.
+        
clockService.updateClock(clock.current().addPhysicalTime(MAX_CLOCK_SKEW_MILLIS 
/ 2));
+
+        // Should update bucket number 9
+        
clockService.updateClock(clock.current().addPhysicalTime(MAX_CLOCK_SKEW_MILLIS 
* 2));
+
+        // Should update bucket number 12
+        
clockService.updateClock(clock.current().addPhysicalTime(MAX_CLOCK_SKEW_MILLIS 
* 10));
+        
clockService.updateClock(clock.current().addPhysicalTime(MAX_CLOCK_SKEW_MILLIS 
* 10));
+
+        assertEquals(1L, clockSkewExceedingMaxClockSkew.value()[9]);
+        assertEquals(2L, clockSkewExceedingMaxClockSkew.value()[12]);
+
+        for (int i = 0; i < clockSkewExceedingMaxClockSkew.value().length; 
i++) {
+            if (i != 9 && i != 12) {
+                assertEquals(0L, clockSkewExceedingMaxClockSkew.value()[i]);
+            }
+        }
+    }
+}
diff --git 
a/modules/partition-replicator/src/integrationTest/java/org/apache/ignite/internal/partition/replicator/fixtures/Node.java
 
b/modules/partition-replicator/src/integrationTest/java/org/apache/ignite/internal/partition/replicator/fixtures/Node.java
index 72182ea9258..84122d98061 100644
--- 
a/modules/partition-replicator/src/integrationTest/java/org/apache/ignite/internal/partition/replicator/fixtures/Node.java
+++ 
b/modules/partition-replicator/src/integrationTest/java/org/apache/ignite/internal/partition/replicator/fixtures/Node.java
@@ -528,7 +528,8 @@ public class Node {
         var clockService = new ClockServiceImpl(
                 hybridClock,
                 clockWaiter,
-                () -> TestIgnitionManager.DEFAULT_MAX_CLOCK_SKEW_MS
+                () -> TestIgnitionManager.DEFAULT_MAX_CLOCK_SKEW_MS,
+                skew -> {}
         );
 
         ReplicaService replicaSvc = new ReplicaService(
diff --git 
a/modules/rest/src/integrationTest/java/org/apache/ignite/internal/rest/metrics/ItMetricControllerTest.java
 
b/modules/rest/src/integrationTest/java/org/apache/ignite/internal/rest/metrics/ItMetricControllerTest.java
index 2dc47c1b2f9..4af0ab6230c 100644
--- 
a/modules/rest/src/integrationTest/java/org/apache/ignite/internal/rest/metrics/ItMetricControllerTest.java
+++ 
b/modules/rest/src/integrationTest/java/org/apache/ignite/internal/rest/metrics/ItMetricControllerTest.java
@@ -66,7 +66,8 @@ class ItMetricControllerTest extends 
ClusterPerClassIntegrationTest {
             new MetricSource("transactions", true),
             new MetricSource("resource.vacuum", true),
             new MetricSource("placement-driver", true),
-            new MetricSource("zones.Default", true)
+            new MetricSource("zones.Default", true),
+            new MetricSource("clock.service", true)
     };
 
     @Inject
diff --git 
a/modules/runner/src/integrationTest/java/org/apache/ignite/internal/runner/app/ItIgniteNodeRestartTest.java
 
b/modules/runner/src/integrationTest/java/org/apache/ignite/internal/runner/app/ItIgniteNodeRestartTest.java
index 586c8e6e04e..fe38c043d51 100644
--- 
a/modules/runner/src/integrationTest/java/org/apache/ignite/internal/runner/app/ItIgniteNodeRestartTest.java
+++ 
b/modules/runner/src/integrationTest/java/org/apache/ignite/internal/runner/app/ItIgniteNodeRestartTest.java
@@ -565,7 +565,8 @@ public class ItIgniteNodeRestartTest extends 
BaseIgniteRestartTest {
         ClockService clockService = new ClockServiceImpl(
                 hybridClock,
                 clockWaiter,
-                () -> schemaSyncConfiguration.maxClockSkewMillis().value()
+                () -> schemaSyncConfiguration.maxClockSkewMillis().value(),
+                skew -> {}
         );
 
         var lowWatermark = new LowWatermarkImpl(
diff --git 
a/modules/runner/src/main/java/org/apache/ignite/internal/app/IgniteImpl.java 
b/modules/runner/src/main/java/org/apache/ignite/internal/app/IgniteImpl.java
index c4e079af912..4eca1ae0a8d 100644
--- 
a/modules/runner/src/main/java/org/apache/ignite/internal/app/IgniteImpl.java
+++ 
b/modules/runner/src/main/java/org/apache/ignite/internal/app/IgniteImpl.java
@@ -170,6 +170,7 @@ import org.apache.ignite.internal.metrics.MetricManager;
 import org.apache.ignite.internal.metrics.MetricManagerImpl;
 import 
org.apache.ignite.internal.metrics.configuration.MetricExtensionConfiguration;
 import org.apache.ignite.internal.metrics.messaging.MetricMessaging;
+import org.apache.ignite.internal.metrics.sources.ClockServiceMetricSource;
 import org.apache.ignite.internal.metrics.sources.JvmMetricSource;
 import org.apache.ignite.internal.metrics.sources.OsMetricSource;
 import org.apache.ignite.internal.network.ChannelType;
@@ -508,6 +509,8 @@ public class IgniteImpl implements Ignite {
     @Nullable
     private volatile ClusterState clusterState;
 
+    private final ClockServiceMetricSource clockServiceMetricSource;
+
     /**
      * The Constructor.
      *
@@ -829,7 +832,14 @@ public class IgniteImpl implements Ignite {
         SchemaSynchronizationConfiguration schemaSyncConfig = 
clusterConfigRegistry
                 
.getConfiguration(SchemaSynchronizationExtensionConfiguration.KEY).schemaSync();
 
-        clockService = new ClockServiceImpl(clock, clockWaiter, () -> 
schemaSyncConfig.maxClockSkewMillis().value());
+        clockServiceMetricSource = new ClockServiceMetricSource();
+
+        clockService = new ClockServiceImpl(
+                clock,
+                clockWaiter,
+                () -> schemaSyncConfig.maxClockSkewMillis().value(),
+                clockServiceMetricSource::onMaxClockSkewExceeded
+        );
 
         idempotentCacheVacuumizer = new IdempotentCacheVacuumizer(
                 name,
@@ -1463,6 +1473,9 @@ public class IgniteImpl implements Ignite {
             metricManager.registerSource(osMetrics);
             metricManager.enable(osMetrics);
 
+            metricManager.registerSource(clockServiceMetricSource);
+            metricManager.enable(clockServiceMetricSource);
+
             // Start the components that are required to join the cluster.
             // TODO https://issues.apache.org/jira/browse/IGNITE-22570
             CompletableFuture<Void> componentsStartFuture = 
lifecycleManager.startComponentsAsync(
diff --git 
a/modules/table/src/test/java/org/apache/ignite/internal/table/distributed/storage/InternalTableEstimatedSizeTest.java
 
b/modules/table/src/test/java/org/apache/ignite/internal/table/distributed/storage/InternalTableEstimatedSizeTest.java
index baca5351f23..1482ee20184 100644
--- 
a/modules/table/src/test/java/org/apache/ignite/internal/table/distributed/storage/InternalTableEstimatedSizeTest.java
+++ 
b/modules/table/src/test/java/org/apache/ignite/internal/table/distributed/storage/InternalTableEstimatedSizeTest.java
@@ -210,7 +210,12 @@ public class InternalTableEstimatedSizeTest extends 
BaseIgniteAbstractTest {
 
         node = clusterService.topologyService().localMember();
 
-        var clockService = new ClockServiceImpl(clock, clockWaiter, () -> 0);
+        var clockService = new ClockServiceImpl(
+                clock,
+                clockWaiter,
+                () -> 0,
+                skew -> {}
+        );
 
         table = new InternalTableImpl(
                 
QualifiedNameHelper.fromNormalized(SqlCommon.DEFAULT_SCHEMA_NAME, TABLE_NAME),

Reply via email to