This is an automated email from the ASF dual-hosted git repository.
sanpwc pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/ignite-3.git
The following commit(s) were added to refs/heads/main by this push:
new 26516f65841 IGNITE-26122 Add metric for clock drift (#6440)
26516f65841 is described below
commit 26516f65841bebd7810be7f30eb1e6d0e1c76484
Author: Alexander Lapin <[email protected]>
AuthorDate: Tue Aug 19 15:57:45 2025 +0300
IGNITE-26122 Add metric for clock drift (#6440)
---
.../ignite/internal/cli/CliIntegrationTest.java | 3 +-
.../ignite/internal/hlc/ClockServiceImpl.java | 17 ++++-
.../ignite/internal/hlc/ClockServiceTest.java | 7 +-
.../rebalance/ItRebalanceDistributedTest.java | 3 +-
.../impl/ItIdempotentCommandCacheTest.java | 3 +-
.../metrics/sources/ClockServiceMetricSource.java | 74 +++++++++++++++++++
.../sources/ClockServiceMetricSourceTest.java | 85 ++++++++++++++++++++++
.../partition/replicator/fixtures/Node.java | 3 +-
.../rest/metrics/ItMetricControllerTest.java | 3 +-
.../runner/app/ItIgniteNodeRestartTest.java | 3 +-
.../org/apache/ignite/internal/app/IgniteImpl.java | 15 +++-
.../storage/InternalTableEstimatedSizeTest.java | 7 +-
12 files changed, 211 insertions(+), 12 deletions(-)
diff --git
a/modules/cli/src/integrationTest/java/org/apache/ignite/internal/cli/CliIntegrationTest.java
b/modules/cli/src/integrationTest/java/org/apache/ignite/internal/cli/CliIntegrationTest.java
index 0dc2de7ca31..87d076a43dc 100644
---
a/modules/cli/src/integrationTest/java/org/apache/ignite/internal/cli/CliIntegrationTest.java
+++
b/modules/cli/src/integrationTest/java/org/apache/ignite/internal/cli/CliIntegrationTest.java
@@ -79,7 +79,8 @@ public abstract class CliIntegrationTest extends
ClusterPerClassIntegrationTest
new MetricSource().name("transactions").enabled(true),
new MetricSource().name("placement-driver").enabled(true),
new MetricSource().name("resource.vacuum").enabled(true),
- new MetricSource().name("zones.Default").enabled(true)
+ new MetricSource().name("zones.Default").enabled(true),
+ new MetricSource().name("clock.service").enabled(true)
};
/** Correct ignite jdbc url. */
diff --git
a/modules/core/src/main/java/org/apache/ignite/internal/hlc/ClockServiceImpl.java
b/modules/core/src/main/java/org/apache/ignite/internal/hlc/ClockServiceImpl.java
index e486f3afa6b..e96271dbf80 100644
---
a/modules/core/src/main/java/org/apache/ignite/internal/hlc/ClockServiceImpl.java
+++
b/modules/core/src/main/java/org/apache/ignite/internal/hlc/ClockServiceImpl.java
@@ -18,6 +18,7 @@
package org.apache.ignite.internal.hlc;
import java.util.concurrent.CompletableFuture;
+import java.util.function.Consumer;
import java.util.function.LongSupplier;
import org.apache.ignite.internal.logger.IgniteLogger;
import org.apache.ignite.internal.logger.Loggers;
@@ -30,18 +31,24 @@ public class ClockServiceImpl implements ClockService {
private final HybridClock clock;
private final ClockWaiter clockWaiter;
-
private final LongSupplier maxClockSkewMsSupplier;
+ private final Consumer<Long> onMaxClockSkewExceededClosure;
private volatile long maxClockSkewMillis = -1;
/**
* Constructor.
*/
- public ClockServiceImpl(HybridClock clock, ClockWaiter clockWaiter,
LongSupplier maxClockSkewMsSupplier) {
+ public ClockServiceImpl(
+ HybridClock clock,
+ ClockWaiter clockWaiter,
+ LongSupplier maxClockSkewMsSupplier,
+ Consumer<Long> onMaxClockSkewExceededClosure
+ ) {
this.clock = clock;
this.clockWaiter = clockWaiter;
this.maxClockSkewMsSupplier = maxClockSkewMsSupplier;
+ this.onMaxClockSkewExceededClosure = onMaxClockSkewExceededClosure;
}
@Override
@@ -70,9 +77,13 @@ public class ClockServiceImpl implements ClockService {
// However, since benchmarks did not show any noticeable performance
penalty due to the aforementioned call duplication,
// design purity was prioritized over call redundancy.
HybridTimestamp currentLocalTimestamp = clock.current();
- if (requestTime.getPhysical() - maxClockSkewMillis() >
currentLocalTimestamp.getPhysical()) {
+ long requestTimePhysical = requestTime.getPhysical();
+ long currentLocalTimePhysical = currentLocalTimestamp.getPhysical();
+
+ if (requestTimePhysical - maxClockSkewMillis() >
currentLocalTimePhysical) {
log.warn("Maximum allowed clock drift exceeded [requestTime={},
localTime={}, maxClockSkew={}]", requestTime,
currentLocalTimestamp, maxClockSkewMillis());
+ onMaxClockSkewExceededClosure.accept(requestTimePhysical -
currentLocalTimePhysical);
}
return clock.update(requestTime);
diff --git
a/modules/core/src/test/java/org/apache/ignite/internal/hlc/ClockServiceTest.java
b/modules/core/src/test/java/org/apache/ignite/internal/hlc/ClockServiceTest.java
index c7ccc356bcd..47fe4925f98 100644
---
a/modules/core/src/test/java/org/apache/ignite/internal/hlc/ClockServiceTest.java
+++
b/modules/core/src/test/java/org/apache/ignite/internal/hlc/ClockServiceTest.java
@@ -51,7 +51,12 @@ public class ClockServiceTest extends BaseIgniteAbstractTest
{
@Test
public void testMaximumAllowedClockDriftExceededIsPrinted() {
HybridClock clock = new HybridClockImpl();
- ClockServiceImpl clockService = new ClockServiceImpl(clock,
clockWaiter, () -> MAX_CLOCK_SKEW_MILLIS);
+ ClockServiceImpl clockService = new ClockServiceImpl(
+ clock,
+ clockWaiter,
+ () -> MAX_CLOCK_SKEW_MILLIS,
+ skew -> {}
+ );
// Check that request time less than max clock skew won't trigger log
warning.
clockService.updateClock(clock.current().addPhysicalTime(MAX_CLOCK_SKEW_MILLIS
/ 2));
diff --git
a/modules/distribution-zones/src/integrationTest/java/org/apache/ignite/internal/rebalance/ItRebalanceDistributedTest.java
b/modules/distribution-zones/src/integrationTest/java/org/apache/ignite/internal/rebalance/ItRebalanceDistributedTest.java
index 0a98d8f1aca..b2a4b9b3afd 100644
---
a/modules/distribution-zones/src/integrationTest/java/org/apache/ignite/internal/rebalance/ItRebalanceDistributedTest.java
+++
b/modules/distribution-zones/src/integrationTest/java/org/apache/ignite/internal/rebalance/ItRebalanceDistributedTest.java
@@ -1426,7 +1426,8 @@ public class ItRebalanceDistributedTest extends
BaseIgniteAbstractTest {
ClockService clockService = new ClockServiceImpl(
hybridClock,
clockWaiter,
- () -> DEFAULT_MAX_CLOCK_SKEW_MS
+ () -> DEFAULT_MAX_CLOCK_SKEW_MS,
+ skew -> {}
);
ReplicaService replicaSvc = new ReplicaService(
diff --git
a/modules/metastorage/src/integrationTest/java/org/apache/ignite/internal/metastorage/impl/ItIdempotentCommandCacheTest.java
b/modules/metastorage/src/integrationTest/java/org/apache/ignite/internal/metastorage/impl/ItIdempotentCommandCacheTest.java
index 66544e739cc..b4b37fbf26a 100644
---
a/modules/metastorage/src/integrationTest/java/org/apache/ignite/internal/metastorage/impl/ItIdempotentCommandCacheTest.java
+++
b/modules/metastorage/src/integrationTest/java/org/apache/ignite/internal/metastorage/impl/ItIdempotentCommandCacheTest.java
@@ -245,7 +245,8 @@ public class ItIdempotentCommandCacheTest extends
IgniteAbstractTest {
clockService = new ClockServiceImpl(
clock,
clockWaiter,
- () -> TEST_MAX_CLOCK_SKEW_MILLIS
+ () -> TEST_MAX_CLOCK_SKEW_MILLIS,
+ skew -> {}
);
}
diff --git
a/modules/metrics/src/main/java/org/apache/ignite/internal/metrics/sources/ClockServiceMetricSource.java
b/modules/metrics/src/main/java/org/apache/ignite/internal/metrics/sources/ClockServiceMetricSource.java
new file mode 100644
index 00000000000..edfb9893f37
--- /dev/null
+++
b/modules/metrics/src/main/java/org/apache/ignite/internal/metrics/sources/ClockServiceMetricSource.java
@@ -0,0 +1,74 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.metrics.sources;
+
+import java.util.List;
+import org.apache.ignite.internal.metrics.AbstractMetricSource;
+import org.apache.ignite.internal.metrics.DistributionMetric;
+import org.apache.ignite.internal.metrics.Metric;
+import
org.apache.ignite.internal.metrics.sources.ClockServiceMetricSource.Holder;
+
+/**
+ * Metric source, which provides clock service metrics.
+ */
+public class ClockServiceMetricSource extends AbstractMetricSource<Holder> {
+ /** Source name. */
+ public static final String SOURCE_NAME = "clock.service";
+
+ /** Histogram buckets for clock skew in milliseconds. */
+ private static final long[] HISTOGRAM_BUCKETS =
+ {1, 2, 4, 8, 16, 25, 50, 75, 100, 250, 500, 750, 1000, 3000, 5000,
10000, 25000, 60000};
+
+ /**
+ * Creates a new instance of {@link ClockServiceMetricSource}.
+ */
+ public ClockServiceMetricSource() {
+ super(SOURCE_NAME, "Clock service metrics.");
+ }
+
+ @Override
+ protected Holder createHolder() {
+ return new Holder();
+ }
+
+ /**
+ * Closure to be called when clock skew exceeded max clock skew.
+ *
+ * @param observedClockSkew Observed max clock skew.
+ */
+ public void onMaxClockSkewExceeded(long observedClockSkew) {
+ Holder holder = holder();
+
+ if (holder != null) {
+ holder.clockSkewExceedingMaxClockSkew.add(observedClockSkew);
+ }
+ }
+
+ /** Holder class. */
+ protected static class Holder implements
AbstractMetricSource.Holder<Holder> {
+ private final DistributionMetric clockSkewExceedingMaxClockSkew = new
DistributionMetric(
+ "ClockSkewExceedingMaxClockSkew",
+ "Observed clock skew that exceeded max clock skew.",
+ HISTOGRAM_BUCKETS);
+
+ @Override
+ public Iterable<Metric> metrics() {
+ return List.of(clockSkewExceedingMaxClockSkew);
+ }
+ }
+}
diff --git
a/modules/metrics/src/test/java/org/apache/ignite/internal/metrics/sources/ClockServiceMetricSourceTest.java
b/modules/metrics/src/test/java/org/apache/ignite/internal/metrics/sources/ClockServiceMetricSourceTest.java
new file mode 100644
index 00000000000..9f1512f1902
--- /dev/null
+++
b/modules/metrics/src/test/java/org/apache/ignite/internal/metrics/sources/ClockServiceMetricSourceTest.java
@@ -0,0 +1,85 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.metrics.sources;
+
+import static org.junit.jupiter.api.Assertions.assertEquals;
+import static org.junit.jupiter.api.Assertions.assertNotNull;
+
+import org.apache.ignite.internal.hlc.ClockServiceImpl;
+import org.apache.ignite.internal.hlc.ClockWaiter;
+import org.apache.ignite.internal.hlc.HybridClock;
+import org.apache.ignite.internal.hlc.HybridClockImpl;
+import org.apache.ignite.internal.metrics.DistributionMetric;
+import org.apache.ignite.internal.metrics.MetricRegistry;
+import org.apache.ignite.internal.metrics.MetricSet;
+import org.apache.ignite.internal.testframework.BaseIgniteAbstractTest;
+import org.junit.jupiter.api.Test;
+import org.mockito.Mock;
+
+/**
+ * Test for clock service metric source.
+ */
+public class ClockServiceMetricSourceTest extends BaseIgniteAbstractTest {
+ private static final long MAX_CLOCK_SKEW_MILLIS = 100;
+ @Mock
+ private ClockWaiter clockWaiter;
+
+ @Test
+ public void test() throws Exception {
+ MetricRegistry registry = new MetricRegistry();
+ HybridClock clock = new HybridClockImpl();
+ ClockServiceMetricSource clockServiceMetricSource = new
ClockServiceMetricSource();
+
+ ClockServiceImpl clockService = new ClockServiceImpl(
+ clock,
+ clockWaiter,
+ () -> MAX_CLOCK_SKEW_MILLIS,
+ clockServiceMetricSource::onMaxClockSkewExceeded
+ );
+ registry.registerSource(clockServiceMetricSource);
+
+ MetricSet metricSet = registry.enable(clockServiceMetricSource);
+ assertNotNull(metricSet);
+
+ DistributionMetric clockSkewExceedingMaxClockSkew =
metricSet.get("ClockSkewExceedingMaxClockSkew");
+ assertNotNull(clockSkewExceedingMaxClockSkew);
+
+ for (int i = 0; i < clockSkewExceedingMaxClockSkew.value().length;
i++) {
+ assertEquals(0L, clockSkewExceedingMaxClockSkew.value()[i]);
+ }
+
+ // Less than max clock skew, should not upgrade the metric.
+
clockService.updateClock(clock.current().addPhysicalTime(MAX_CLOCK_SKEW_MILLIS
/ 2));
+
+ // Should update bucket number 9
+
clockService.updateClock(clock.current().addPhysicalTime(MAX_CLOCK_SKEW_MILLIS
* 2));
+
+ // Should update bucket number 12
+
clockService.updateClock(clock.current().addPhysicalTime(MAX_CLOCK_SKEW_MILLIS
* 10));
+
clockService.updateClock(clock.current().addPhysicalTime(MAX_CLOCK_SKEW_MILLIS
* 10));
+
+ assertEquals(1L, clockSkewExceedingMaxClockSkew.value()[9]);
+ assertEquals(2L, clockSkewExceedingMaxClockSkew.value()[12]);
+
+ for (int i = 0; i < clockSkewExceedingMaxClockSkew.value().length;
i++) {
+ if (i != 9 && i != 12) {
+ assertEquals(0L, clockSkewExceedingMaxClockSkew.value()[i]);
+ }
+ }
+ }
+}
diff --git
a/modules/partition-replicator/src/integrationTest/java/org/apache/ignite/internal/partition/replicator/fixtures/Node.java
b/modules/partition-replicator/src/integrationTest/java/org/apache/ignite/internal/partition/replicator/fixtures/Node.java
index 72182ea9258..84122d98061 100644
---
a/modules/partition-replicator/src/integrationTest/java/org/apache/ignite/internal/partition/replicator/fixtures/Node.java
+++
b/modules/partition-replicator/src/integrationTest/java/org/apache/ignite/internal/partition/replicator/fixtures/Node.java
@@ -528,7 +528,8 @@ public class Node {
var clockService = new ClockServiceImpl(
hybridClock,
clockWaiter,
- () -> TestIgnitionManager.DEFAULT_MAX_CLOCK_SKEW_MS
+ () -> TestIgnitionManager.DEFAULT_MAX_CLOCK_SKEW_MS,
+ skew -> {}
);
ReplicaService replicaSvc = new ReplicaService(
diff --git
a/modules/rest/src/integrationTest/java/org/apache/ignite/internal/rest/metrics/ItMetricControllerTest.java
b/modules/rest/src/integrationTest/java/org/apache/ignite/internal/rest/metrics/ItMetricControllerTest.java
index 2dc47c1b2f9..4af0ab6230c 100644
---
a/modules/rest/src/integrationTest/java/org/apache/ignite/internal/rest/metrics/ItMetricControllerTest.java
+++
b/modules/rest/src/integrationTest/java/org/apache/ignite/internal/rest/metrics/ItMetricControllerTest.java
@@ -66,7 +66,8 @@ class ItMetricControllerTest extends
ClusterPerClassIntegrationTest {
new MetricSource("transactions", true),
new MetricSource("resource.vacuum", true),
new MetricSource("placement-driver", true),
- new MetricSource("zones.Default", true)
+ new MetricSource("zones.Default", true),
+ new MetricSource("clock.service", true)
};
@Inject
diff --git
a/modules/runner/src/integrationTest/java/org/apache/ignite/internal/runner/app/ItIgniteNodeRestartTest.java
b/modules/runner/src/integrationTest/java/org/apache/ignite/internal/runner/app/ItIgniteNodeRestartTest.java
index 586c8e6e04e..fe38c043d51 100644
---
a/modules/runner/src/integrationTest/java/org/apache/ignite/internal/runner/app/ItIgniteNodeRestartTest.java
+++
b/modules/runner/src/integrationTest/java/org/apache/ignite/internal/runner/app/ItIgniteNodeRestartTest.java
@@ -565,7 +565,8 @@ public class ItIgniteNodeRestartTest extends
BaseIgniteRestartTest {
ClockService clockService = new ClockServiceImpl(
hybridClock,
clockWaiter,
- () -> schemaSyncConfiguration.maxClockSkewMillis().value()
+ () -> schemaSyncConfiguration.maxClockSkewMillis().value(),
+ skew -> {}
);
var lowWatermark = new LowWatermarkImpl(
diff --git
a/modules/runner/src/main/java/org/apache/ignite/internal/app/IgniteImpl.java
b/modules/runner/src/main/java/org/apache/ignite/internal/app/IgniteImpl.java
index c4e079af912..4eca1ae0a8d 100644
---
a/modules/runner/src/main/java/org/apache/ignite/internal/app/IgniteImpl.java
+++
b/modules/runner/src/main/java/org/apache/ignite/internal/app/IgniteImpl.java
@@ -170,6 +170,7 @@ import org.apache.ignite.internal.metrics.MetricManager;
import org.apache.ignite.internal.metrics.MetricManagerImpl;
import
org.apache.ignite.internal.metrics.configuration.MetricExtensionConfiguration;
import org.apache.ignite.internal.metrics.messaging.MetricMessaging;
+import org.apache.ignite.internal.metrics.sources.ClockServiceMetricSource;
import org.apache.ignite.internal.metrics.sources.JvmMetricSource;
import org.apache.ignite.internal.metrics.sources.OsMetricSource;
import org.apache.ignite.internal.network.ChannelType;
@@ -508,6 +509,8 @@ public class IgniteImpl implements Ignite {
@Nullable
private volatile ClusterState clusterState;
+ private final ClockServiceMetricSource clockServiceMetricSource;
+
/**
* The Constructor.
*
@@ -829,7 +832,14 @@ public class IgniteImpl implements Ignite {
SchemaSynchronizationConfiguration schemaSyncConfig =
clusterConfigRegistry
.getConfiguration(SchemaSynchronizationExtensionConfiguration.KEY).schemaSync();
- clockService = new ClockServiceImpl(clock, clockWaiter, () ->
schemaSyncConfig.maxClockSkewMillis().value());
+ clockServiceMetricSource = new ClockServiceMetricSource();
+
+ clockService = new ClockServiceImpl(
+ clock,
+ clockWaiter,
+ () -> schemaSyncConfig.maxClockSkewMillis().value(),
+ clockServiceMetricSource::onMaxClockSkewExceeded
+ );
idempotentCacheVacuumizer = new IdempotentCacheVacuumizer(
name,
@@ -1463,6 +1473,9 @@ public class IgniteImpl implements Ignite {
metricManager.registerSource(osMetrics);
metricManager.enable(osMetrics);
+ metricManager.registerSource(clockServiceMetricSource);
+ metricManager.enable(clockServiceMetricSource);
+
// Start the components that are required to join the cluster.
// TODO https://issues.apache.org/jira/browse/IGNITE-22570
CompletableFuture<Void> componentsStartFuture =
lifecycleManager.startComponentsAsync(
diff --git
a/modules/table/src/test/java/org/apache/ignite/internal/table/distributed/storage/InternalTableEstimatedSizeTest.java
b/modules/table/src/test/java/org/apache/ignite/internal/table/distributed/storage/InternalTableEstimatedSizeTest.java
index baca5351f23..1482ee20184 100644
---
a/modules/table/src/test/java/org/apache/ignite/internal/table/distributed/storage/InternalTableEstimatedSizeTest.java
+++
b/modules/table/src/test/java/org/apache/ignite/internal/table/distributed/storage/InternalTableEstimatedSizeTest.java
@@ -210,7 +210,12 @@ public class InternalTableEstimatedSizeTest extends
BaseIgniteAbstractTest {
node = clusterService.topologyService().localMember();
- var clockService = new ClockServiceImpl(clock, clockWaiter, () -> 0);
+ var clockService = new ClockServiceImpl(
+ clock,
+ clockWaiter,
+ () -> 0,
+ skew -> {}
+ );
table = new InternalTableImpl(
QualifiedNameHelper.fromNormalized(SqlCommon.DEFAULT_SCHEMA_NAME, TABLE_NAME),