This is an automated email from the ASF dual-hosted git repository.
rpuch pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/ignite-3.git
The following commit(s) were added to refs/heads/main by this push:
new 696b49676ff IGNITE-22912 Add metrics related to Meta Storage
availability (#7845)
696b49676ff is described below
commit 696b49676ffaa7e742663d3e32e12387dcafaa0b
Author: Mirza Aliev <[email protected]>
AuthorDate: Fri Mar 27 17:57:20 2026 +0400
IGNITE-22912 Add metrics related to Meta Storage availability (#7845)
---
.../ignite/internal/cli/CliIntegrationTest.java | 1 +
.../metastorage/impl/MetaStorageManagerImpl.java | 78 ++++++++++++++++++++-
.../metrics/MetaStorageMetricSource.java | 32 ++++++++-
.../metrics/MetaStorageMetricSourceTest.java | 79 ++++++++++++++++++++++
.../rest/metrics/ItMetricControllerTest.java | 1 +
.../org/apache/ignite/internal/app/IgniteImpl.java | 11 ++-
.../distributed/schema/SchemaSyncMetricSource.java | 78 +++++++++++++++++++++
.../distributed/schema/SchemaSyncServiceImpl.java | 30 +++++++-
.../schema/SchemaSyncMetricSourceTest.java | 78 +++++++++++++++++++++
.../schema/SchemaSyncServiceImplTest.java | 58 ++++++++++++++++
10 files changed, 440 insertions(+), 6 deletions(-)
diff --git
a/modules/cli/src/integrationTest/java/org/apache/ignite/internal/cli/CliIntegrationTest.java
b/modules/cli/src/integrationTest/java/org/apache/ignite/internal/cli/CliIntegrationTest.java
index b2ab7350714..07121038d86 100644
---
a/modules/cli/src/integrationTest/java/org/apache/ignite/internal/cli/CliIntegrationTest.java
+++
b/modules/cli/src/integrationTest/java/org/apache/ignite/internal/cli/CliIntegrationTest.java
@@ -66,6 +66,7 @@ public abstract class CliIntegrationTest extends
ClusterPerClassIntegrationTest
new MetricSource().name("os").enabled(true),
new MetricSource().name("raft").enabled(true),
new MetricSource().name("metastorage").enabled(true),
+ new MetricSource().name("schema.sync").enabled(true),
new MetricSource().name("client.handler").enabled(true),
new MetricSource().name("sql.client").enabled(true),
new MetricSource().name("sql.plan.cache").enabled(true),
diff --git
a/modules/metastorage/src/main/java/org/apache/ignite/internal/metastorage/impl/MetaStorageManagerImpl.java
b/modules/metastorage/src/main/java/org/apache/ignite/internal/metastorage/impl/MetaStorageManagerImpl.java
index eee80e1e468..bea68877d4e 100644
---
a/modules/metastorage/src/main/java/org/apache/ignite/internal/metastorage/impl/MetaStorageManagerImpl.java
+++
b/modules/metastorage/src/main/java/org/apache/ignite/internal/metastorage/impl/MetaStorageManagerImpl.java
@@ -38,10 +38,13 @@ import java.util.concurrent.CompletableFuture;
import java.util.concurrent.CompletionException;
import java.util.concurrent.CopyOnWriteArrayList;
import java.util.concurrent.Executor;
+import java.util.concurrent.Executors;
import java.util.concurrent.Flow.Publisher;
import java.util.concurrent.Flow.Subscriber;
import java.util.concurrent.Flow.Subscription;
import java.util.concurrent.ForkJoinPool;
+import java.util.concurrent.ScheduledExecutorService;
+import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicReference;
import java.util.function.Function;
@@ -49,6 +52,7 @@ import java.util.function.Supplier;
import
org.apache.ignite.internal.cluster.management.ClusterManagementGroupManager;
import org.apache.ignite.internal.cluster.management.ClusterState;
import org.apache.ignite.internal.cluster.management.MetaStorageInfo;
+import org.apache.ignite.internal.cluster.management.topology.api.LogicalNode;
import
org.apache.ignite.internal.cluster.management.topology.api.LogicalTopologyService;
import org.apache.ignite.internal.configuration.SystemDistributedConfiguration;
import org.apache.ignite.internal.disaster.system.message.ResetClusterMessage;
@@ -108,6 +112,7 @@ import
org.apache.ignite.internal.raft.client.TopologyAwareRaftGroupService;
import
org.apache.ignite.internal.raft.client.TopologyAwareRaftGroupServiceFactory;
import org.apache.ignite.internal.raft.server.RaftGroupOptions;
import org.apache.ignite.internal.raft.service.RaftGroupService;
+import org.apache.ignite.internal.thread.IgniteThreadFactory;
import org.apache.ignite.internal.tostring.S;
import org.apache.ignite.internal.util.Cursor;
import org.apache.ignite.internal.util.ExceptionUtils;
@@ -214,6 +219,24 @@ public class MetaStorageManagerImpl implements
MetaStorageManager, MetastorageGr
/** Tracks only reads from the leader, local reads are tracked by the
storage itself. */
private final ReadOperationForCompactionTracker
readOperationFromLeaderForCompactionTracker;
+ /** Current Meta Storage voting peers (consistent IDs), updated on each
committed Raft configuration. */
+ private volatile Set<String> currentVotingPeers = Set.of();
+
+ /**
+ * MetaStorage availability flag: {@code true} if Meta Storage majority
can execute commands, {@code false} otherwise.
+ * Updated by the periodic availability check.
+ */
+ private volatile boolean msAvailable = false;
+
+ /** Periodic executor that checks Meta Storage availability. */
+ private @Nullable ScheduledExecutorService availabilityCheckExecutor;
+
+ /** Interval between availability checks, in milliseconds. */
+ private static final long AVAILABILITY_CHECK_PERIOD_MS = 5_000L;
+
+ /** Timeout for a single availability check command, in milliseconds. */
+ private static final long AVAILABILITY_CHECK_TIMEOUT_MS = 5_000L;
+
private final MetastorageDivergencyValidator divergencyValidator = new
MetastorageDivergencyValidator();
private final RecoveryRevisionsListenerImpl recoveryRevisionsListener;
@@ -256,7 +279,7 @@ public class MetaStorageManagerImpl implements
MetaStorageManager, MetastorageGr
this.storage = storage;
this.clock = clock;
this.clusterTime = new ClusterTimeImpl(clusterService.nodeName(),
busyLock, clock, failureProcessor);
- this.metaStorageMetricSource = new
MetaStorageMetricSource(clusterTime);
+ this.metaStorageMetricSource = new
MetaStorageMetricSource(clusterTime, this::computeAvailablePeers, () ->
msAvailable ? 1 : 0);
this.topologyAwareRaftGroupServiceFactory =
topologyAwareRaftGroupServiceFactory;
this.metricManager = metricManager;
this.metastorageRepairStorage = metastorageRepairStorage;
@@ -618,6 +641,8 @@ public class MetaStorageManagerImpl implements
MetaStorageManager, MetastorageGr
private void onConfigurationCommitted(RaftGroupConfiguration
configuration) {
LOG.info("MS configuration committed {}", configuration);
+ currentVotingPeers = Set.copyOf(configuration.peers());
+
// TODO: IGNITE-23210 - use thenAccept() when implemented.
raftServiceFuture
.handle((raftService, ex) -> {
@@ -772,6 +797,16 @@ public class MetaStorageManagerImpl implements
MetaStorageManager, MetastorageGr
metricManager.registerSource(metaStorageMetricSource);
metricManager.enable(metaStorageMetricSource);
+ availabilityCheckExecutor = Executors.newSingleThreadScheduledExecutor(
+ IgniteThreadFactory.create(clusterService.nodeName(),
"metastorage-availability-check", LOG)
+ );
+ availabilityCheckExecutor.scheduleWithFixedDelay(
+ this::checkMgAvailability,
+ AVAILABILITY_CHECK_PERIOD_MS,
+ AVAILABILITY_CHECK_PERIOD_MS,
+ TimeUnit.MILLISECONDS
+ );
+
return nullCompletedFuture();
}
@@ -809,6 +844,7 @@ public class MetaStorageManagerImpl implements
MetaStorageManager, MetastorageGr
try {
IgniteUtils.closeAllManually(
+ () ->
IgniteUtils.shutdownAndAwaitTermination(availabilityCheckExecutor, 10,
TimeUnit.SECONDS),
() ->
metricManager.unregisterSource(metaStorageMetricSource),
clusterTime,
() -> failOrConsume(metaStorageSvcFut, new
NodeStoppingException(), MetaStorageServiceImpl::close),
@@ -1101,6 +1137,46 @@ public class MetaStorageManagerImpl implements
MetaStorageManager, MetastorageGr
}
}
+ /**
+ * Returns the number of MetaStorage voting peers that are present in the
current logical topology.
+ */
+ private int computeAvailablePeers() {
+ Set<String> peers = currentVotingPeers;
+
+ if (peers.isEmpty()) {
+ return 0;
+ }
+
+ int count = 0;
+
+ for (LogicalNode node :
logicalTopologyService.localLogicalTopology().nodes()) {
+ if (peers.contains(node.name())) {
+ count++;
+ }
+ }
+
+ return count;
+ }
+
+ /**
+ * Performs a periodic check of Meta Storage availability by attempting to
execute a command.
+ * Updates {@link #msAvailable} based on whether the attempt succeeds
within the timeout.
+ */
+ private void checkMgAvailability() {
+ if (!busyLock.enterBusy()) {
+ return;
+ }
+
+ try {
+ metaStorageSvcFut
+ .thenCompose(MetaStorageServiceImpl::currentRevisions)
+ .orTimeout(AVAILABILITY_CHECK_TIMEOUT_MS,
TimeUnit.MILLISECONDS)
+ .whenComplete((rev, ex) -> msAvailable = ex == null);
+ } finally {
+ busyLock.leaveBusy();
+ }
+ }
+
private void onSafeTimeAdvanced(HybridTimestamp time) {
assert time != null;
diff --git
a/modules/metastorage/src/main/java/org/apache/ignite/internal/metastorage/metrics/MetaStorageMetricSource.java
b/modules/metastorage/src/main/java/org/apache/ignite/internal/metastorage/metrics/MetaStorageMetricSource.java
index 93570716593..59249c26290 100644
---
a/modules/metastorage/src/main/java/org/apache/ignite/internal/metastorage/metrics/MetaStorageMetricSource.java
+++
b/modules/metastorage/src/main/java/org/apache/ignite/internal/metastorage/metrics/MetaStorageMetricSource.java
@@ -18,9 +18,11 @@
package org.apache.ignite.internal.metastorage.metrics;
import java.util.List;
+import java.util.function.IntSupplier;
import
org.apache.ignite.internal.metastorage.metrics.MetaStorageMetricSource.Holder;
import org.apache.ignite.internal.metrics.AbstractMetricSource;
import org.apache.ignite.internal.metrics.AtomicIntMetric;
+import org.apache.ignite.internal.metrics.IntGauge;
import org.apache.ignite.internal.metrics.LongGauge;
import org.apache.ignite.internal.metrics.LongMetric;
import org.apache.ignite.internal.metrics.Metric;
@@ -33,13 +35,23 @@ public class MetaStorageMetricSource extends
AbstractMetricSource<Holder> {
private final MetaStorageMetrics metaStorageMetrics;
+ private final IntSupplier availablePeersSupplier;
+
+ private final IntSupplier availableSupplier;
+
/**
* Constructor.
*/
- public MetaStorageMetricSource(MetaStorageMetrics metaStorageMetrics) {
+ public MetaStorageMetricSource(
+ MetaStorageMetrics metaStorageMetrics,
+ IntSupplier availablePeersSupplier,
+ IntSupplier availableSupplier
+ ) {
super(SOURCE_NAME);
this.metaStorageMetrics = metaStorageMetrics;
+ this.availablePeersSupplier = availablePeersSupplier;
+ this.availableSupplier = availableSupplier;
}
@Override
@@ -63,7 +75,7 @@ public class MetaStorageMetricSource extends
AbstractMetricSource<Holder> {
protected class Holder implements AbstractMetricSource.Holder<Holder> {
private final LongMetric safeTimeLag = new LongGauge(
"SafeTimeLag",
- "Number of milliseconds the local MetaStorage SafeTime lags
behind the local logical clock.",
+ "Number of milliseconds the local Meta Storage SafeTime lags
behind the local logical clock.",
metaStorageMetrics::safeTimeLag
);
@@ -72,9 +84,23 @@ public class MetaStorageMetricSource extends
AbstractMetricSource<Holder> {
"The current size of the cache of idempotent commands'
results."
);
+ private final IntGauge availablePeers = new IntGauge(
+ "AvailablePeers",
+ "Number of available members of the Meta Storage voting set
based on the current logical topology.",
+ availablePeersSupplier
+ );
+
+ private final IntGauge majorityAvailable = new IntGauge(
+ "MajorityAvailable",
+ "1 if the Meta Storage majority is available (can execute
commands), 0 otherwise.",
+ availableSupplier
+ );
+
private final List<Metric> metrics = List.of(
safeTimeLag,
- idempotentCacheSize
+ idempotentCacheSize,
+ availablePeers,
+ majorityAvailable
);
@Override
diff --git
a/modules/metastorage/src/test/java/org/apache/ignite/internal/metastorage/metrics/MetaStorageMetricSourceTest.java
b/modules/metastorage/src/test/java/org/apache/ignite/internal/metastorage/metrics/MetaStorageMetricSourceTest.java
new file mode 100644
index 00000000000..ff584dc1ec8
--- /dev/null
+++
b/modules/metastorage/src/test/java/org/apache/ignite/internal/metastorage/metrics/MetaStorageMetricSourceTest.java
@@ -0,0 +1,79 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.metastorage.metrics;
+
+import static org.junit.jupiter.api.Assertions.assertEquals;
+import static org.junit.jupiter.api.Assertions.assertNotNull;
+
+import org.apache.ignite.internal.metrics.IntMetric;
+import org.apache.ignite.internal.metrics.MetricRegistry;
+import org.apache.ignite.internal.metrics.MetricSet;
+import org.apache.ignite.internal.testframework.BaseIgniteAbstractTest;
+import org.junit.jupiter.api.Test;
+
+/** Tests for {@link MetaStorageMetricSource}. */
+class MetaStorageMetricSourceTest extends BaseIgniteAbstractTest {
+ @Test
+ void availablePeersMetricReadsFromSupplier() {
+ int[] value = {3};
+
+ MetaStorageMetricSource source = new MetaStorageMetricSource(() -> 0L,
() -> value[0], () -> 0);
+ MetricSet metricSet = enableSource(source);
+
+ IntMetric availablePeers = metricSet.get("AvailablePeers");
+ assertNotNull(availablePeers);
+ assertEquals(3, availablePeers.value());
+
+ value[0] = 1;
+ assertEquals(1, availablePeers.value());
+ }
+
+ @Test
+ void availableMetricReadsFromSupplier() {
+ int[] value = {0};
+
+ MetaStorageMetricSource source = new MetaStorageMetricSource(() -> 0L,
() -> 0, () -> value[0]);
+ MetricSet metricSet = enableSource(source);
+
+ IntMetric available = metricSet.get("MajorityAvailable");
+ assertNotNull(available);
+ assertEquals(0, available.value());
+
+ value[0] = 1;
+ assertEquals(1, available.value());
+ }
+
+ @Test
+ void idempotentCacheSizeIsUpdatedViaCallback() {
+ MetaStorageMetricSource source = new MetaStorageMetricSource(() -> 0L,
() -> 0, () -> 0);
+ MetricSet metricSet = enableSource(source);
+
+ IntMetric cacheSize = metricSet.get("IdempotentCacheSize");
+ assertNotNull(cacheSize);
+ assertEquals(0, cacheSize.value());
+
+ source.onIdempotentCacheSizeChange(42);
+ assertEquals(42, cacheSize.value());
+ }
+
+ private static MetricSet enableSource(MetaStorageMetricSource source) {
+ MetricRegistry registry = new MetricRegistry();
+ registry.registerSource(source);
+ return registry.enable(source);
+ }
+}
diff --git
a/modules/rest/src/integrationTest/java/org/apache/ignite/internal/rest/metrics/ItMetricControllerTest.java
b/modules/rest/src/integrationTest/java/org/apache/ignite/internal/rest/metrics/ItMetricControllerTest.java
index 3ead07d5d90..e9789aabd6b 100644
---
a/modules/rest/src/integrationTest/java/org/apache/ignite/internal/rest/metrics/ItMetricControllerTest.java
+++
b/modules/rest/src/integrationTest/java/org/apache/ignite/internal/rest/metrics/ItMetricControllerTest.java
@@ -54,6 +54,7 @@ class ItMetricControllerTest extends
ClusterPerClassIntegrationTest {
new MetricSource("os", true),
new MetricSource("raft", true),
new MetricSource("metastorage", true),
+ new MetricSource("schema.sync", true),
new MetricSource("client.handler", true),
new MetricSource("sql.client", true),
new MetricSource("sql.plan.cache", true),
diff --git
a/modules/runner/src/main/java/org/apache/ignite/internal/app/IgniteImpl.java
b/modules/runner/src/main/java/org/apache/ignite/internal/app/IgniteImpl.java
index 5926b94d810..3471002eb7f 100644
---
a/modules/runner/src/main/java/org/apache/ignite/internal/app/IgniteImpl.java
+++
b/modules/runner/src/main/java/org/apache/ignite/internal/app/IgniteImpl.java
@@ -277,6 +277,7 @@ import
org.apache.ignite.internal.table.distributed.raft.MinimumRequiredTimeColl
import
org.apache.ignite.internal.table.distributed.raft.PartitionSafeTimeValidator;
import
org.apache.ignite.internal.table.distributed.schema.CheckCatalogVersionOnActionRequest;
import
org.apache.ignite.internal.table.distributed.schema.CheckCatalogVersionOnAppendEntries;
+import
org.apache.ignite.internal.table.distributed.schema.SchemaSyncMetricSource;
import
org.apache.ignite.internal.table.distributed.schema.SchemaSyncServiceImpl;
import
org.apache.ignite.internal.table.distributed.schema.ThreadLocalPartitionCommandsMarshaller;
import org.apache.ignite.internal.thread.IgniteThreadFactory;
@@ -944,7 +945,15 @@ public class IgniteImpl implements Ignite {
schemaSafeTimeTracker = new
SchemaSafeTimeTrackerImpl(metaStorageMgr.clusterTime(),
metaStorageMgr.watchExecutor());
metaStorageMgr.registerNotificationEnqueuedListener(schemaSafeTimeTracker);
- SchemaSyncService schemaSyncService = new
SchemaSyncServiceImpl(schemaSafeTimeTracker, delayDurationMsSupplier);
+ SchemaSyncMetricSource schemaSyncMetricSource = new
SchemaSyncMetricSource();
+ metricManager.registerSource(schemaSyncMetricSource);
+ metricManager.enable(schemaSyncMetricSource);
+
+ SchemaSyncService schemaSyncService = new SchemaSyncServiceImpl(
+ schemaSafeTimeTracker,
+ delayDurationMsSupplier,
+ schemaSyncMetricSource::recordWait
+ );
schemaManager = new SchemaManager(registry, catalogManager);
diff --git
a/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/schema/SchemaSyncMetricSource.java
b/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/schema/SchemaSyncMetricSource.java
new file mode 100644
index 00000000000..044943c2edd
--- /dev/null
+++
b/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/schema/SchemaSyncMetricSource.java
@@ -0,0 +1,78 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.table.distributed.schema;
+
+import java.util.List;
+import org.apache.ignite.internal.metrics.AbstractMetricSource;
+import org.apache.ignite.internal.metrics.DistributionMetric;
+import org.apache.ignite.internal.metrics.Metric;
+
+/**
+ * Metric source for schema synchronization metrics.
+ */
+public class SchemaSyncMetricSource extends
AbstractMetricSource<SchemaSyncMetricSource.Holder> {
+ private static final String SOURCE_NAME = "schema.sync";
+
+ /**
+ * Constructor.
+ */
+ public SchemaSyncMetricSource() {
+ super(SOURCE_NAME);
+ }
+
+ /**
+ * Histogram bounds (in milliseconds) for schema sync wait time
distribution.
+ * Buckets: [0..1], [1..5], [5..10], [10..50], [50..100], [100..500],
[500..1000], [1000..5000], [5000..inf].
+ */
+ private static final long[] WAIT_BOUNDS_MS = {1, 5, 10, 50, 100, 500,
1000, 5000};
+
+ /**
+ * Records a completed schema sync wait with the given duration.
+ *
+ * @param durationMs Duration of the wait in milliseconds.
+ */
+ public void recordWait(long durationMs) {
+ Holder holder = holder();
+
+ if (holder != null) {
+ holder.waits.add(durationMs);
+ }
+ }
+
+ @Override
+ protected Holder createHolder() {
+ return new Holder();
+ }
+
+ /** Holder. */
+ protected static class Holder implements
AbstractMetricSource.Holder<Holder> {
+ private final DistributionMetric waits = new DistributionMetric(
+ "Waits",
+ "Histogram of schema synchronization wait times in
milliseconds."
+ + " High values may indicate Meta Storage
unavailability or slowness.",
+ WAIT_BOUNDS_MS
+ );
+
+ private final List<Metric> metrics = List.of(waits);
+
+ @Override
+ public Iterable<Metric> metrics() {
+ return metrics;
+ }
+ }
+}
diff --git
a/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/schema/SchemaSyncServiceImpl.java
b/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/schema/SchemaSyncServiceImpl.java
index 0a1b3e6a752..5bb1ce0cccc 100644
---
a/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/schema/SchemaSyncServiceImpl.java
+++
b/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/schema/SchemaSyncServiceImpl.java
@@ -18,6 +18,8 @@
package org.apache.ignite.internal.table.distributed.schema;
import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.TimeUnit;
+import java.util.function.LongConsumer;
import java.util.function.LongSupplier;
import org.apache.ignite.internal.hlc.HybridTimestamp;
import org.apache.ignite.internal.schema.SchemaSafeTimeTracker;
@@ -31,17 +33,43 @@ public class SchemaSyncServiceImpl implements
SchemaSyncService {
private final LongSupplier delayDurationMs;
+ private final LongConsumer waitDurationMsRecorder;
+
/**
* Constructor.
*/
public SchemaSyncServiceImpl(SchemaSafeTimeTracker schemaSafeTimeTracker,
LongSupplier delayDurationMs) {
+ this(schemaSafeTimeTracker, delayDurationMs, durationMs -> {});
+ }
+
+ /**
+ * Constructor with metrics recording.
+ *
+ * @param schemaSafeTimeTracker Schema safe time tracker.
+ * @param delayDurationMs Supplier of the delay duration in milliseconds.
+ * @param waitDurationMsRecorder Consumer that receives the duration (in
ms) of each completed wait.
+ */
+ public SchemaSyncServiceImpl(
+ SchemaSafeTimeTracker schemaSafeTimeTracker,
+ LongSupplier delayDurationMs,
+ LongConsumer waitDurationMsRecorder
+ ) {
this.schemaSafeTimeTracker = schemaSafeTimeTracker;
this.delayDurationMs = delayDurationMs;
+ this.waitDurationMsRecorder = waitDurationMsRecorder;
}
@Override
public CompletableFuture<Void> waitForMetadataCompleteness(HybridTimestamp
ts) {
- return schemaSafeTimeTracker.waitFor(metastoreSafeTimeToWait(ts));
+ CompletableFuture<Void> future =
schemaSafeTimeTracker.waitFor(metastoreSafeTimeToWait(ts));
+
+ if (future.isDone()) {
+ return future;
+ }
+
+ long startNs = System.nanoTime();
+
+ return future.whenComplete((v, ex) ->
waitDurationMsRecorder.accept(TimeUnit.NANOSECONDS.toMillis(System.nanoTime() -
startNs)));
}
private HybridTimestamp metastoreSafeTimeToWait(HybridTimestamp ts) {
diff --git
a/modules/table/src/test/java/org/apache/ignite/internal/table/distributed/schema/SchemaSyncMetricSourceTest.java
b/modules/table/src/test/java/org/apache/ignite/internal/table/distributed/schema/SchemaSyncMetricSourceTest.java
new file mode 100644
index 00000000000..72954835b11
--- /dev/null
+++
b/modules/table/src/test/java/org/apache/ignite/internal/table/distributed/schema/SchemaSyncMetricSourceTest.java
@@ -0,0 +1,78 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.table.distributed.schema;
+
+import static org.junit.jupiter.api.Assertions.assertEquals;
+import static org.junit.jupiter.api.Assertions.assertNotNull;
+
+import org.apache.ignite.internal.metrics.DistributionMetric;
+import org.apache.ignite.internal.metrics.MetricRegistry;
+import org.apache.ignite.internal.metrics.MetricSet;
+import org.apache.ignite.internal.testframework.BaseIgniteAbstractTest;
+import org.junit.jupiter.api.BeforeEach;
+import org.junit.jupiter.api.Test;
+
+/** Tests for {@link SchemaSyncMetricSource}. */
+class SchemaSyncMetricSourceTest extends BaseIgniteAbstractTest {
+ private final SchemaSyncMetricSource source = new SchemaSyncMetricSource();
+ private final MetricRegistry registry = new MetricRegistry();
+ private DistributionMetric waits;
+
+ @BeforeEach
+ void setUp() {
+ registry.registerSource(source);
+ MetricSet metricSet = registry.enable(source);
+
+ waits = metricSet.get("Waits");
+ assertNotNull(waits);
+ }
+
+ @Test
+ void allBucketsAreZeroInitially() {
+ for (long count : waits.value()) {
+ assertEquals(0L, count);
+ }
+ }
+
+ @Test
+ void recordWaitPopulatesCorrectBucket() {
+ source.recordWait(7); // falls in bucket 2: (5..10]
+ source.recordWait(200); // falls in bucket 5: (100..500]
+ source.recordWait(3000); // falls in bucket 7: (1000..5000]
+
+ assertEquals(1L, waits.value()[2]);
+ assertEquals(1L, waits.value()[5]);
+ assertEquals(1L, waits.value()[7]);
+ }
+
+ @Test
+ void recordWaitAccumulatesCountsInSameBucket() {
+ source.recordWait(1); // bucket 0: [0..1]
+ source.recordWait(0); // bucket 0: [0..1]
+ source.recordWait(1); // bucket 0: [0..1]
+
+ assertEquals(3L, waits.value()[0]);
+ }
+
+ @Test
+ void recordWaitIsNoOpWhenSourceIsDisabled() {
+ SchemaSyncMetricSource disabledSource = new SchemaSyncMetricSource();
+ // not registered or enabled - should not throw
+ disabledSource.recordWait(100);
+ }
+}
diff --git
a/modules/table/src/test/java/org/apache/ignite/internal/table/distributed/schema/SchemaSyncServiceImplTest.java
b/modules/table/src/test/java/org/apache/ignite/internal/table/distributed/schema/SchemaSyncServiceImplTest.java
index f6535d02ad3..acba3a96839 100644
---
a/modules/table/src/test/java/org/apache/ignite/internal/table/distributed/schema/SchemaSyncServiceImplTest.java
+++
b/modules/table/src/test/java/org/apache/ignite/internal/table/distributed/schema/SchemaSyncServiceImplTest.java
@@ -19,11 +19,17 @@ package org.apache.ignite.internal.table.distributed.schema;
import static
org.apache.ignite.internal.testframework.matchers.CompletableFutureCompletedMatcher.completedFuture;
import static
org.apache.ignite.internal.testframework.matchers.CompletableFutureMatcher.willCompleteSuccessfully;
+import static
org.apache.ignite.internal.util.CompletableFutures.nullCompletedFuture;
import static org.hamcrest.MatcherAssert.assertThat;
+import static org.hamcrest.Matchers.empty;
+import static org.hamcrest.Matchers.greaterThanOrEqualTo;
+import static org.hamcrest.Matchers.hasSize;
import static org.hamcrest.Matchers.is;
import static org.hamcrest.Matchers.not;
import static org.mockito.Mockito.when;
+import java.util.ArrayList;
+import java.util.List;
import java.util.concurrent.CompletableFuture;
import java.util.function.LongSupplier;
import org.apache.ignite.internal.hlc.HybridClock;
@@ -70,4 +76,56 @@ class SchemaSyncServiceImplTest extends
BaseIgniteAbstractTest {
safeTimeFuture.complete(null);
assertThat(waitFuture, willCompleteSuccessfully());
}
+
+ @Test
+ void waitRecorderIsCalledWithDurationOnCompletion() {
+ List<Long> recorded = new ArrayList<>();
+ schemaSyncService = new SchemaSyncServiceImpl(schemaSafeTimeTracker,
delayDurationMs, recorded::add);
+
+ HybridTimestamp ts = clock.now();
+ var safeTimeFuture = new CompletableFuture<Void>();
+
+
when(schemaSafeTimeTracker.waitFor(ts.subtractPhysicalTime(delayDurationMs.getAsLong()))).thenReturn(safeTimeFuture);
+
+ schemaSyncService.waitForMetadataCompleteness(ts);
+
+ assertThat(recorded, empty());
+
+ safeTimeFuture.complete(null);
+
+ assertThat(recorded, hasSize(1));
+ assertThat(recorded.get(0), greaterThanOrEqualTo(0L));
+ }
+
+ @Test
+ void waitRecorderIsCalledEvenWhenFutureCompletesExceptionally() {
+ List<Long> recorded = new ArrayList<>();
+ schemaSyncService = new SchemaSyncServiceImpl(schemaSafeTimeTracker,
delayDurationMs, recorded::add);
+
+ HybridTimestamp ts = clock.now();
+ var safeTimeFuture = new CompletableFuture<Void>();
+
+
when(schemaSafeTimeTracker.waitFor(ts.subtractPhysicalTime(delayDurationMs.getAsLong()))).thenReturn(safeTimeFuture);
+
+ schemaSyncService.waitForMetadataCompleteness(ts);
+
+ safeTimeFuture.completeExceptionally(new RuntimeException("test
error"));
+
+ assertThat(recorded, hasSize(1));
+ assertThat(recorded.get(0), greaterThanOrEqualTo(0L));
+ }
+
+ @Test
+ void waitRecorderIsNotCalledWhenFutureIsAlreadyCompleted() {
+ List<Long> recorded = new ArrayList<>();
+ schemaSyncService = new SchemaSyncServiceImpl(schemaSafeTimeTracker,
delayDurationMs, recorded::add);
+
+ HybridTimestamp ts = clock.now();
+
+
when(schemaSafeTimeTracker.waitFor(ts.subtractPhysicalTime(delayDurationMs.getAsLong()))).thenReturn(nullCompletedFuture());
+
+ schemaSyncService.waitForMetadataCompleteness(ts);
+
+ assertThat(recorded, empty());
+ }
}