This is an automated email from the ASF dual-hosted git repository.
mlbiscoc pushed a commit to branch feature/SOLR-17458
in repository https://gitbox.apache.org/repos/asf/solr.git
The following commit(s) were added to refs/heads/feature/SOLR-17458 by this
push:
new dfb05fd781d SOLR-17806: Migrate thread pool metrics to OTEL (#3519)
dfb05fd781d is described below
commit dfb05fd781d2a2cf02c44c7024239e047754de04
Author: Jude Muriithi <[email protected]>
AuthorDate: Wed Sep 17 16:16:33 2025 -0400
SOLR-17806: Migrate thread pool metrics to OTEL (#3519)
* Stub out OtelInstrumentedeExecutorService
* Finish OTEL ExecutorService implementation and replace MetricsUtil calls
* Add OtelUnits to timers
* spotlessApply
* feedback
* Use IOUtils.closeQuietly
* Check in partial tests
* Add pool metrics testing
* use utility method in tests
---------
Co-authored-by: jmuriithi3 <[email protected]>
---
.../java/org/apache/solr/core/CoreContainer.java | 7 +-
.../solr/handler/admin/CoreAdminHandler.java | 16 +-
.../handler/component/HttpShardHandlerFactory.java | 6 +-
.../org/apache/solr/update/UpdateShardHandler.java | 12 +-
.../org/apache/solr/util/stats/MetricUtils.java | 23 +-
.../stats/OtelInstrumentedExecutorService.java | 302 +++++++++++++++++++++
.../stats/OtelInstrumentedExecutorServiceTest.java | 238 ++++++++++++++++
.../org/apache/solr/util/SolrMetricTestUtils.java | 10 +
8 files changed, 572 insertions(+), 42 deletions(-)
diff --git a/solr/core/src/java/org/apache/solr/core/CoreContainer.java
b/solr/core/src/java/org/apache/solr/core/CoreContainer.java
index 955e343a34e..737a8628be4 100644
--- a/solr/core/src/java/org/apache/solr/core/CoreContainer.java
+++ b/solr/core/src/java/org/apache/solr/core/CoreContainer.java
@@ -1122,10 +1122,9 @@ public class CoreContainer {
ExecutorUtil.newMDCAwareFixedThreadPool(
cfg.getCoreLoadThreadCount(isZooKeeperAware()),
new SolrNamedThreadFactory("coreLoadExecutor")),
- null,
-
metricManager.registry(SolrMetricManager.getRegistryName(SolrInfoBean.Group.node)),
- SolrMetricManager.mkName(
- "coreLoadExecutor",
SolrInfoBean.Category.CONTAINER.toString(), "threadPool"));
+ solrMetricsContext,
+ SolrInfoBean.Category.CONTAINER,
+ "coreLoadExecutor");
coreSorter =
loader.newInstance(
diff --git
a/solr/core/src/java/org/apache/solr/handler/admin/CoreAdminHandler.java
b/solr/core/src/java/org/apache/solr/handler/admin/CoreAdminHandler.java
index b0237978593..41306919693 100644
--- a/solr/core/src/java/org/apache/solr/handler/admin/CoreAdminHandler.java
+++ b/solr/core/src/java/org/apache/solr/handler/admin/CoreAdminHandler.java
@@ -74,7 +74,6 @@ import org.apache.solr.handler.admin.api.SplitCoreAPI;
import org.apache.solr.handler.admin.api.SwapCores;
import org.apache.solr.handler.admin.api.UnloadCore;
import org.apache.solr.logging.MDCLoggingContext;
-import org.apache.solr.metrics.SolrMetricManager;
import org.apache.solr.metrics.SolrMetricsContext;
import org.apache.solr.request.SolrQueryRequest;
import org.apache.solr.response.SolrQueryResponse;
@@ -128,7 +127,6 @@ public class CoreAdminHandler extends RequestHandlerBase
implements PermissionNa
+ "it is a special Handler configured directly by the
RequestDispatcher");
}
- // TODO SOLR-17458: Migrate to Otel
@Override
public void initializeMetrics(
SolrMetricsContext parentContext, Attributes attributes, String scope) {
@@ -136,18 +134,16 @@ public class CoreAdminHandler extends RequestHandlerBase
implements PermissionNa
coreAdminAsyncTracker.standardExecutor =
MetricUtils.instrumentedExecutorService(
coreAdminAsyncTracker.standardExecutor,
- this,
- solrMetricsContext.getMetricRegistry(),
- SolrMetricManager.mkName(
- "parallelCoreAdminExecutor", getCategory().name(), scope,
"threadPool"));
+ solrMetricsContext,
+ getCategory(),
+ "parallelCoreAdminExecutor");
coreAdminAsyncTracker.expensiveExecutor =
MetricUtils.instrumentedExecutorService(
coreAdminAsyncTracker.expensiveExecutor,
- this,
- solrMetricsContext.getMetricRegistry(),
- SolrMetricManager.mkName(
- "parallelCoreExpensiveAdminExecutor", getCategory().name(),
scope, "threadPool"));
+ solrMetricsContext,
+ getCategory(),
+ "parallelCoreExpensiveAdminExecutor");
}
@Override
diff --git
a/solr/core/src/java/org/apache/solr/handler/component/HttpShardHandlerFactory.java
b/solr/core/src/java/org/apache/solr/handler/component/HttpShardHandlerFactory.java
index f9769387589..435a04eda7f 100644
---
a/solr/core/src/java/org/apache/solr/handler/component/HttpShardHandlerFactory.java
+++
b/solr/core/src/java/org/apache/solr/handler/component/HttpShardHandlerFactory.java
@@ -442,13 +442,9 @@ public class HttpShardHandlerFactory extends
ShardHandlerFactory
SolrMetricsContext parentContext, Attributes attributes, String scope) {
solrMetricsContext = parentContext.getChildContext(this);
String expandedScope = SolrMetricManager.mkName(scope,
SolrInfoBean.Category.QUERY.name());
- // TODO SOLR-17458: Add Otel
httpListenerFactory.initializeMetrics(solrMetricsContext,
Attributes.empty(), expandedScope);
commExecutor =
MetricUtils.instrumentedExecutorService(
- commExecutor,
- null,
- solrMetricsContext.getMetricRegistry(),
- SolrMetricManager.mkName("httpShardExecutor", expandedScope,
"threadPool"));
+ commExecutor, solrMetricsContext, SolrInfoBean.Category.QUERY,
"httpShardExecutor");
}
}
diff --git a/solr/core/src/java/org/apache/solr/update/UpdateShardHandler.java
b/solr/core/src/java/org/apache/solr/update/UpdateShardHandler.java
index 27da324d34d..b1994866893 100644
--- a/solr/core/src/java/org/apache/solr/update/UpdateShardHandler.java
+++ b/solr/core/src/java/org/apache/solr/update/UpdateShardHandler.java
@@ -203,28 +203,20 @@ public class UpdateShardHandler implements SolrInfoBean {
return this.getClass().getName();
}
- // TODO SOLR-17458: Add Otel
@Override
public void initializeMetrics(
SolrMetricsContext parentContext, Attributes attributes, String scope) {
solrMetricsContext = parentContext.getChildContext(this);
String expandedScope = SolrMetricManager.mkName(scope,
getCategory().name());
- // TODO SOLR-17458: Add Otel
trackHttpSolrMetrics.initializeMetrics(solrMetricsContext,
Attributes.empty(), expandedScope);
defaultConnectionManager.initializeMetrics(
solrMetricsContext, Attributes.empty(), expandedScope);
updateExecutor =
MetricUtils.instrumentedExecutorService(
- updateExecutor,
- this,
- solrMetricsContext.getMetricRegistry(),
- SolrMetricManager.mkName("updateOnlyExecutor", expandedScope,
"threadPool"));
+ updateExecutor, solrMetricsContext, getCategory(),
"updateOnlyExecutor");
recoveryExecutor =
MetricUtils.instrumentedExecutorService(
- recoveryExecutor,
- this,
- solrMetricsContext.getMetricRegistry(),
- SolrMetricManager.mkName("recoveryExecutor", expandedScope,
"threadPool"));
+ recoveryExecutor, solrMetricsContext, getCategory(),
"recoveryExecutor");
}
@Override
diff --git a/solr/core/src/java/org/apache/solr/util/stats/MetricUtils.java
b/solr/core/src/java/org/apache/solr/util/stats/MetricUtils.java
index 9cff065c06f..3447e3e6f0a 100644
--- a/solr/core/src/java/org/apache/solr/util/stats/MetricUtils.java
+++ b/solr/core/src/java/org/apache/solr/util/stats/MetricUtils.java
@@ -19,7 +19,6 @@ package org.apache.solr.util.stats;
import com.codahale.metrics.Counter;
import com.codahale.metrics.Gauge;
import com.codahale.metrics.Histogram;
-import com.codahale.metrics.InstrumentedExecutorService;
import com.codahale.metrics.Meter;
import com.codahale.metrics.Metric;
import com.codahale.metrics.MetricFilter;
@@ -56,6 +55,7 @@ import org.apache.solr.common.util.NamedList;
import org.apache.solr.core.SolrInfoBean;
import org.apache.solr.metrics.AggregateMetric;
import org.apache.solr.metrics.SolrMetricManager;
+import org.apache.solr.metrics.SolrMetricsContext;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -744,18 +744,6 @@ public class MetricUtils {
}
}
- /** Returns an instrumented wrapper over the given executor service. */
- public static ExecutorService instrumentedExecutorService(
- ExecutorService delegate, SolrInfoBean info, MetricRegistry
metricRegistry, String scope) {
- if (info != null && info.getSolrMetricsContext() != null) {
-
info.getSolrMetricsContext().registerMetricName(MetricRegistry.name(scope,
"submitted"));
-
info.getSolrMetricsContext().registerMetricName(MetricRegistry.name(scope,
"running"));
-
info.getSolrMetricsContext().registerMetricName(MetricRegistry.name(scope,
"completed"));
-
info.getSolrMetricsContext().registerMetricName(MetricRegistry.name(scope,
"duration"));
- }
- return new InstrumentedExecutorService(delegate, metricRegistry, scope);
- }
-
/**
* Creates a set of metrics (gauges) that correspond to available bean
properties for the provided
* MXBean.
@@ -829,6 +817,15 @@ public class MetricUtils {
"com.ibm.lang.management.OperatingSystemMXBean"
};
+ /** Returns an instrumented wrapper over the given executor service. */
+ public static ExecutorService instrumentedExecutorService(
+ ExecutorService delegate,
+ SolrMetricsContext ctx,
+ SolrInfoBean.Category category,
+ String name) {
+ return new OtelInstrumentedExecutorService(delegate, ctx, category, name);
+ }
+
/**
* Creates a set of metrics (gauges) that correspond to available bean
properties for the provided
* MXBean.
diff --git
a/solr/core/src/java/org/apache/solr/util/stats/OtelInstrumentedExecutorService.java
b/solr/core/src/java/org/apache/solr/util/stats/OtelInstrumentedExecutorService.java
new file mode 100644
index 00000000000..2823826d4b5
--- /dev/null
+++
b/solr/core/src/java/org/apache/solr/util/stats/OtelInstrumentedExecutorService.java
@@ -0,0 +1,302 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.solr.util.stats;
+
+import static org.apache.solr.metrics.SolrMetricProducer.CATEGORY_ATTR;
+import static org.apache.solr.metrics.SolrMetricProducer.TYPE_ATTR;
+
+import io.opentelemetry.api.common.AttributeKey;
+import io.opentelemetry.api.common.Attributes;
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.List;
+import java.util.concurrent.BlockingQueue;
+import java.util.concurrent.Callable;
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.ForkJoinPool;
+import java.util.concurrent.Future;
+import java.util.concurrent.ThreadPoolExecutor;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.TimeoutException;
+import org.apache.solr.common.util.IOUtils;
+import org.apache.solr.core.SolrInfoBean;
+import org.apache.solr.metrics.SolrMetricsContext;
+import org.apache.solr.metrics.otel.OtelUnit;
+import org.apache.solr.metrics.otel.instruments.AttributedLongCounter;
+import org.apache.solr.metrics.otel.instruments.AttributedLongTimer;
+import
org.apache.solr.metrics.otel.instruments.AttributedLongTimer.MetricTimer;
+import org.apache.solr.metrics.otel.instruments.AttributedLongUpDownCounter;
+
+/**
+ * OTEL instrumentation wrapper around {@link ExecutorService}. Based on {@link
+ * com.codahale.metrics.InstrumentedExecutorService}.
+ */
+public class OtelInstrumentedExecutorService implements ExecutorService {
+ public static final AttributeKey<String> EXECUTOR_NAME_ATTR =
+ AttributeKey.stringKey("executor_name");
+
+ private final ExecutorService delegate;
+ private final String executorName;
+ private final List<AutoCloseable> observableMetrics;
+ private final AttributedLongCounter submitted;
+ private final AttributedLongUpDownCounter running;
+ private final AttributedLongCounter completed;
+ private final AttributedLongTimer idle;
+ private final AttributedLongTimer duration;
+
+ public OtelInstrumentedExecutorService(
+ ExecutorService delegate,
+ SolrMetricsContext ctx,
+ SolrInfoBean.Category category,
+ String executorName) {
+ this.delegate = delegate;
+ this.executorName = executorName;
+ this.observableMetrics = new ArrayList<>();
+
+ Attributes attrs =
+ Attributes.builder()
+ .put(CATEGORY_ATTR, category.toString())
+ .put(EXECUTOR_NAME_ATTR, executorName)
+ .build();
+
+ // Each metric type needs a separate name to avoid obscuring other types
+ var executorTaskCounter =
+ ctx.longCounter("solr_executor_tasks", "Number of ExecutorService
tasks");
+ this.submitted =
+ new AttributedLongCounter(
+ executorTaskCounter, attrs.toBuilder().put(TYPE_ATTR,
"submitted").build());
+ this.completed =
+ new AttributedLongCounter(
+ executorTaskCounter, attrs.toBuilder().put(TYPE_ATTR,
"completed").build());
+ this.running =
+ new AttributedLongUpDownCounter(
+ ctx.longUpDownCounter(
+ "solr_executor_tasks_running", "Number of running
ExecutorService tasks"),
+ attrs);
+ var executorTaskTimer =
+ ctx.longHistogram(
+ "solr_executor_task_times", "Timing of ExecutorService tasks",
OtelUnit.MILLISECONDS);
+ this.idle =
+ new AttributedLongTimer(
+ executorTaskTimer, attrs.toBuilder().put(TYPE_ATTR,
"idle").build());
+ this.duration =
+ new AttributedLongTimer(
+ executorTaskTimer, attrs.toBuilder().put(TYPE_ATTR,
"duration").build());
+
+ if (delegate instanceof ThreadPoolExecutor) {
+ ThreadPoolExecutor threadPool = (ThreadPoolExecutor) delegate;
+ observableMetrics.add(
+ ctx.observableLongGauge(
+ "solr_executor_thread_pool_size",
+ "Thread pool size",
+ measurement -> {
+ measurement.record(
+ threadPool.getPoolSize(), attrs.toBuilder().put(TYPE_ATTR,
"size").build());
+ measurement.record(
+ threadPool.getCorePoolSize(),
attrs.toBuilder().put(TYPE_ATTR, "core").build());
+ measurement.record(
+ threadPool.getMaximumPoolSize(),
+ attrs.toBuilder().put(TYPE_ATTR, "max").build());
+ }));
+
+ final BlockingQueue<Runnable> taskQueue = threadPool.getQueue();
+ observableMetrics.add(
+ ctx.observableLongGauge(
+ "solr_executor_thread_pool_tasks",
+ "Thread pool task counts",
+ measurement -> {
+ measurement.record(
+ threadPool.getActiveCount(),
+ attrs.toBuilder().put(TYPE_ATTR, "active").build());
+ measurement.record(
+ threadPool.getCompletedTaskCount(),
+ attrs.toBuilder().put(TYPE_ATTR, "completed").build());
+ measurement.record(
+ taskQueue.size(), attrs.toBuilder().put(TYPE_ATTR,
"queued").build());
+ measurement.record(
+ taskQueue.remainingCapacity(),
+ attrs.toBuilder().put(TYPE_ATTR, "capacity").build());
+ }));
+ } else if (delegate instanceof ForkJoinPool) {
+ ForkJoinPool forkJoinPool = (ForkJoinPool) delegate;
+ observableMetrics.add(
+ ctx.observableLongGauge(
+ "solr_executor_fork_join_pool_tasks",
+ "Fork join pool task counts",
+ measurement -> {
+ measurement.record(
+ forkJoinPool.getStealCount(),
+ attrs.toBuilder().put(TYPE_ATTR, "stolen").build());
+ measurement.record(
+ forkJoinPool.getQueuedTaskCount(),
+ attrs.toBuilder().put(TYPE_ATTR, "queued").build());
+ }));
+ observableMetrics.add(
+ ctx.observableLongGauge(
+ "solr_executor_fork_join_pool_threads",
+ "Fork join pool thread counts",
+ measurement -> {
+ measurement.record(
+ forkJoinPool.getActiveThreadCount(),
+ attrs.toBuilder().put(TYPE_ATTR, "active").build());
+ measurement.record(
+ forkJoinPool.getRunningThreadCount(),
+ attrs.toBuilder().put(TYPE_ATTR, "running").build());
+ }));
+ }
+ }
+
+ @Override
+ public void execute(Runnable task) {
+ submitted.inc();
+ delegate.execute(new InstrumentedRunnable(task));
+ }
+
+ @Override
+ public Future<?> submit(Runnable task) {
+ submitted.inc();
+ return delegate.submit(new InstrumentedRunnable(task));
+ }
+
+ @Override
+ public <T> Future<T> submit(Runnable task, T result) {
+ submitted.inc();
+ return delegate.submit(new InstrumentedRunnable(task), result);
+ }
+
+ @Override
+ public <T> Future<T> submit(Callable<T> task) {
+ submitted.inc();
+ return delegate.submit(new InstrumentedCallable<>(task));
+ }
+
+ @Override
+ public <T> List<Future<T>> invokeAll(Collection<? extends Callable<T>> tasks)
+ throws InterruptedException {
+ submitted.add(Long.valueOf(tasks.size()));
+ return delegate.invokeAll(instrument(tasks));
+ }
+
+ @Override
+ public <T> List<Future<T>> invokeAll(
+ Collection<? extends Callable<T>> tasks, long timeout, TimeUnit unit)
+ throws InterruptedException {
+ submitted.add(Long.valueOf(tasks.size()));
+ return delegate.invokeAll(instrument(tasks), timeout, unit);
+ }
+
+ @Override
+ public <T> T invokeAny(Collection<? extends Callable<T>> tasks)
+ throws InterruptedException, ExecutionException {
+ submitted.add(Long.valueOf(tasks.size()));
+ return delegate.invokeAny(instrument(tasks));
+ }
+
+ @Override
+ public <T> T invokeAny(Collection<? extends Callable<T>> tasks, long
timeout, TimeUnit unit)
+ throws InterruptedException, ExecutionException, TimeoutException {
+ submitted.add(Long.valueOf(tasks.size()));
+ return delegate.invokeAny(instrument(tasks), timeout, unit);
+ }
+
+ @Override
+ public void shutdown() {
+ delegate.shutdown();
+ observableMetrics.stream().forEach(IOUtils::closeQuietly);
+ }
+
+ @Override
+ public List<Runnable> shutdownNow() {
+ List<Runnable> tasks = delegate.shutdownNow();
+ observableMetrics.stream().forEach(IOUtils::closeQuietly);
+ return tasks;
+ }
+
+ @Override
+ public boolean isShutdown() {
+ return delegate.isShutdown();
+ }
+
+ @Override
+ public boolean isTerminated() {
+ return delegate.isTerminated();
+ }
+
+ @Override
+ public boolean awaitTermination(long timeout, TimeUnit unit) throws
InterruptedException {
+ return delegate.awaitTermination(timeout, unit);
+ }
+
+ private <T> List<InstrumentedCallable<T>> instrument(Collection<? extends
Callable<T>> tasks) {
+ List<InstrumentedCallable<T>> instrumented = new ArrayList<>(tasks.size());
+ for (Callable<T> task : tasks) {
+ instrumented.add(new InstrumentedCallable<>(task));
+ }
+ return instrumented;
+ }
+
+ private class InstrumentedRunnable implements Runnable {
+ private final Runnable task;
+ private final MetricTimer idleTimer;
+
+ InstrumentedRunnable(Runnable task) {
+ this.task = task;
+ this.idleTimer = idle.start();
+ }
+
+ @Override
+ public void run() {
+ idleTimer.stop();
+ running.inc();
+
+ MetricTimer durationTimer = duration.start();
+ try {
+ task.run();
+ } finally {
+ durationTimer.stop();
+ running.dec();
+ completed.inc();
+ }
+ }
+ }
+
+ private class InstrumentedCallable<T> implements Callable<T> {
+ private final Callable<T> task;
+ private final MetricTimer idleTimer;
+
+ InstrumentedCallable(Callable<T> task) {
+ this.task = task;
+ this.idleTimer = idle.start();
+ }
+
+ @Override
+ public T call() throws Exception {
+ idleTimer.stop();
+ running.inc();
+
+ MetricTimer durationTimer = duration.start();
+ try {
+ return task.call();
+ } finally {
+ durationTimer.stop();
+ running.dec();
+ completed.inc();
+ }
+ }
+ }
+}
diff --git
a/solr/core/src/test/org/apache/solr/util/stats/OtelInstrumentedExecutorServiceTest.java
b/solr/core/src/test/org/apache/solr/util/stats/OtelInstrumentedExecutorServiceTest.java
new file mode 100644
index 00000000000..2d7ad162e4f
--- /dev/null
+++
b/solr/core/src/test/org/apache/solr/util/stats/OtelInstrumentedExecutorServiceTest.java
@@ -0,0 +1,238 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.solr.util.stats;
+
+import static
com.carrotsearch.randomizedtesting.RandomizedTest.randomIntBetween;
+import static org.apache.solr.metrics.SolrMetricProducer.TYPE_ATTR;
+
+import io.prometheus.metrics.model.snapshots.CounterSnapshot;
+import
io.prometheus.metrics.model.snapshots.CounterSnapshot.CounterDataPointSnapshot;
+import io.prometheus.metrics.model.snapshots.GaugeSnapshot;
+import
io.prometheus.metrics.model.snapshots.GaugeSnapshot.GaugeDataPointSnapshot;
+import io.prometheus.metrics.model.snapshots.HistogramSnapshot;
+import
io.prometheus.metrics.model.snapshots.HistogramSnapshot.HistogramDataPointSnapshot;
+import io.prometheus.metrics.model.snapshots.MetricSnapshots;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import java.util.concurrent.TimeUnit;
+import org.apache.solr.SolrTestCase;
+import org.apache.solr.core.SolrInfoBean;
+import org.apache.solr.metrics.SolrMetricManager;
+import org.apache.solr.metrics.SolrMetricsContext;
+import org.apache.solr.util.SolrMetricTestUtils;
+import org.junit.Before;
+import org.junit.Test;
+
+public class OtelInstrumentedExecutorServiceTest extends SolrTestCase {
+ public static final int PARALLELISM = 10;
+ public static long EXEC_TIMEOUT = 1;
+ public static TimeUnit EXEC_TIMEOUT_UNITS = TimeUnit.SECONDS;
+ public static final String REGISTRY_NAME = "solr-test-otel-registry";
+ public static final String TAG_NAME = "solr-test-otel-tag";
+ public static final double DELTA = 1E-8;
+
+ public static SolrMetricsContext metricsContext;
+
+ @Before
+ public void setUpMetrics() {
+ metricsContext = new SolrMetricsContext(new SolrMetricManager(),
REGISTRY_NAME, TAG_NAME);
+ }
+
+ @Test
+ public void taskCount() throws InterruptedException {
+ try (var exec = testExecutor("taskCount",
Executors.newFixedThreadPool(PARALLELISM))) {
+ final int numTasks = 225;
+ for (int i = 0; i < numTasks; ++i) {
+ exec.submit(() -> {});
+ }
+ exec.awaitTermination(EXEC_TIMEOUT, EXEC_TIMEOUT_UNITS);
+
+ MetricSnapshots metrics =
+
metricsContext.getMetricManager().getPrometheusMetricReader(REGISTRY_NAME).collect();
+ GaugeSnapshot tasksRunning =
+ SolrMetricTestUtils.getMetricSnapshot(
+ GaugeSnapshot.class, metrics, "solr_executor_tasks_running");
+ CounterSnapshot taskCounters =
+ SolrMetricTestUtils.getMetricSnapshot(
+ CounterSnapshot.class, metrics, "solr_executor_tasks");
+
+ GaugeDataPointSnapshot runningTasks =
tasksRunning.getDataPoints().getFirst();
+ CounterDataPointSnapshot submittedTasks = getCounterData(taskCounters,
"submitted");
+ CounterDataPointSnapshot completedTasks = getCounterData(taskCounters,
"completed");
+
+ assertEquals(0.0, runningTasks.getValue(), DELTA);
+ assertEquals(numTasks, submittedTasks.getValue(), DELTA);
+ assertEquals(numTasks, completedTasks.getValue(), DELTA);
+ }
+ }
+
+ @Test
+ public void taskRandomCount() throws InterruptedException {
+ try (var exec = testExecutor("taskRandomCount",
Executors.newFixedThreadPool(PARALLELISM))) {
+ final int numTasks = randomIntBetween(1, 500);
+ for (int i = 0; i < numTasks; ++i) {
+ exec.submit(() -> {});
+ }
+ exec.awaitTermination(EXEC_TIMEOUT, EXEC_TIMEOUT_UNITS);
+
+ MetricSnapshots metrics =
+
metricsContext.getMetricManager().getPrometheusMetricReader(REGISTRY_NAME).collect();
+ GaugeSnapshot tasksRunning =
+ SolrMetricTestUtils.getMetricSnapshot(
+ GaugeSnapshot.class, metrics, "solr_executor_tasks_running");
+ CounterSnapshot taskCounters =
+ SolrMetricTestUtils.getMetricSnapshot(
+ CounterSnapshot.class, metrics, "solr_executor_tasks");
+
+ GaugeDataPointSnapshot runningTasks =
tasksRunning.getDataPoints().getFirst();
+ CounterDataPointSnapshot submittedTasks = getCounterData(taskCounters,
"submitted");
+ CounterDataPointSnapshot completedTasks = getCounterData(taskCounters,
"completed");
+
+ assertEquals(0.0, runningTasks.getValue(), DELTA);
+ assertEquals(numTasks, submittedTasks.getValue(), DELTA);
+ assertEquals(numTasks, completedTasks.getValue(), DELTA);
+ }
+ }
+
+ @Test
+ public void taskTimers() throws InterruptedException {
+ try (var exec = testExecutor("taskTimers",
Executors.newFixedThreadPool(PARALLELISM))) {
+ final long durationMs = 200;
+ final double durationDeltaMs = 10.0;
+ exec.submit(
+ () -> {
+ try {
+ Thread.sleep(durationMs);
+ } catch (InterruptedException e) {
+ }
+ });
+ exec.awaitTermination(EXEC_TIMEOUT, EXEC_TIMEOUT_UNITS);
+
+ MetricSnapshots metrics =
+
metricsContext.getMetricManager().getPrometheusMetricReader(REGISTRY_NAME).collect();
+ HistogramSnapshot taskTimers =
+ metrics.stream()
+ .filter(
+ m ->
m.getMetadata().getPrometheusName().startsWith("solr_executor_task_times"))
+ .findFirst()
+ .map(HistogramSnapshot.class::cast)
+ .get();
+
+ HistogramDataPointSnapshot idleTimer =
+ taskTimers.getDataPoints().stream()
+ .filter(data ->
data.getLabels().get(TYPE_ATTR.toString()).equals("idle"))
+ .findFirst()
+ .get();
+ HistogramDataPointSnapshot durationTimer =
+ taskTimers.getDataPoints().stream()
+ .filter(data ->
data.getLabels().get(TYPE_ATTR.toString()).equals("duration"))
+ .findFirst()
+ .get();
+
+ assertTrue(TimeUnit.SECONDS.toMillis(5) > idleTimer.getSum());
+ assertEquals(durationMs, durationTimer.getSum(), durationDeltaMs);
+ }
+ }
+
+ @Test
+ public void threadPoolTasks() throws InterruptedException {
+ try (var exec = testExecutor("threadPoolTasks",
Executors.newFixedThreadPool(PARALLELISM))) {
+ final int numTasks = 225;
+ for (int i = 0; i < numTasks; ++i) {
+ exec.submit(() -> {});
+ }
+ exec.awaitTermination(EXEC_TIMEOUT, EXEC_TIMEOUT_UNITS);
+
+ MetricSnapshots metrics =
+
metricsContext.getMetricManager().getPrometheusMetricReader(REGISTRY_NAME).collect();
+ GaugeSnapshot sizeGauges =
+ SolrMetricTestUtils.getMetricSnapshot(
+ GaugeSnapshot.class, metrics, "solr_executor_thread_pool_size");
+ GaugeSnapshot taskGauges =
+ SolrMetricTestUtils.getMetricSnapshot(
+ GaugeSnapshot.class, metrics, "solr_executor_thread_pool_tasks");
+
+ GaugeDataPointSnapshot poolSize = getGaugeData(sizeGauges, "size");
+ GaugeDataPointSnapshot corePoolSize = getGaugeData(sizeGauges, "core");
+ GaugeDataPointSnapshot maxPoolSize = getGaugeData(sizeGauges, "max");
+
+ GaugeDataPointSnapshot activeTasks = getGaugeData(taskGauges, "active");
+ GaugeDataPointSnapshot completedTasks = getGaugeData(taskGauges,
"completed");
+ GaugeDataPointSnapshot queuedTasks = getGaugeData(taskGauges, "queued");
+ GaugeDataPointSnapshot poolCapacity = getGaugeData(taskGauges,
"capacity");
+
+ assertEquals(PARALLELISM, poolSize.getValue(), DELTA);
+ assertEquals(PARALLELISM, corePoolSize.getValue(), DELTA);
+ assertEquals(PARALLELISM, maxPoolSize.getValue(), DELTA);
+
+ assertEquals(0.0, activeTasks.getValue(), DELTA);
+ assertEquals(numTasks, completedTasks.getValue(), DELTA);
+ assertEquals(0.0, queuedTasks.getValue(), DELTA);
+ assertEquals(Integer.MAX_VALUE, poolCapacity.getValue(), DELTA);
+ }
+ }
+
+ @Test
+ public void forkJoinPoolTasks() throws InterruptedException {
+ try (var exec = testExecutor("forkJoinPoolTasks",
Executors.newWorkStealingPool(PARALLELISM))) {
+ final int numTasks = 225;
+ for (int i = 0; i < numTasks; ++i) {
+ exec.submit(() -> {});
+ }
+ exec.awaitTermination(EXEC_TIMEOUT, EXEC_TIMEOUT_UNITS);
+
+ MetricSnapshots metrics =
+
metricsContext.getMetricManager().getPrometheusMetricReader(REGISTRY_NAME).collect();
+ GaugeSnapshot taskGauges =
+ SolrMetricTestUtils.getMetricSnapshot(
+ GaugeSnapshot.class, metrics,
"solr_executor_fork_join_pool_tasks");
+ GaugeSnapshot threadGauges =
+ SolrMetricTestUtils.getMetricSnapshot(
+ GaugeSnapshot.class, metrics,
"solr_executor_fork_join_pool_threads");
+ GaugeDataPointSnapshot stolenTasks = getGaugeData(taskGauges, "stolen");
+ GaugeDataPointSnapshot queuedTasks = getGaugeData(taskGauges, "queued");
+
+ GaugeDataPointSnapshot activeThreads = getGaugeData(threadGauges,
"active");
+ GaugeDataPointSnapshot runningThreads = getGaugeData(threadGauges,
"running");
+
+ assertNotNull(stolenTasks.getValue());
+ assertEquals(0.0, queuedTasks.getValue(), DELTA);
+
+ assertEquals(0.0, activeThreads.getValue(), DELTA);
+ assertEquals(0.0, runningThreads.getValue(), DELTA);
+ }
+ }
+
+ private static ExecutorService testExecutor(String name, ExecutorService
exec) {
+ return MetricUtils.instrumentedExecutorService(
+ exec, metricsContext, SolrInfoBean.Category.ADMIN, name);
+ }
+
+ private static CounterDataPointSnapshot getCounterData(CounterSnapshot
snapshot, String type) {
+ return snapshot.getDataPoints().stream()
+ .filter(data ->
data.getLabels().get(TYPE_ATTR.toString()).equals(type))
+ .findFirst()
+ .get();
+ }
+
+ private static GaugeDataPointSnapshot getGaugeData(GaugeSnapshot snapshot,
String type) {
+ return snapshot.getDataPoints().stream()
+ .filter(data ->
data.getLabels().get(TYPE_ATTR.toString()).equals(type))
+ .findFirst()
+ .get();
+ }
+}
diff --git
a/solr/test-framework/src/java/org/apache/solr/util/SolrMetricTestUtils.java
b/solr/test-framework/src/java/org/apache/solr/util/SolrMetricTestUtils.java
index 8fcafc125c9..3531094cfc3 100644
--- a/solr/test-framework/src/java/org/apache/solr/util/SolrMetricTestUtils.java
+++ b/solr/test-framework/src/java/org/apache/solr/util/SolrMetricTestUtils.java
@@ -25,6 +25,7 @@ import
io.prometheus.metrics.model.snapshots.DataPointSnapshot;
import io.prometheus.metrics.model.snapshots.GaugeSnapshot;
import io.prometheus.metrics.model.snapshots.HistogramSnapshot;
import io.prometheus.metrics.model.snapshots.Labels;
+import io.prometheus.metrics.model.snapshots.MetricSnapshot;
import io.prometheus.metrics.model.snapshots.MetricSnapshots;
import java.util.ArrayList;
import java.util.HashMap;
@@ -210,6 +211,15 @@ public final class SolrMetricTestUtils {
return
container.getMetricManager().getPrometheusMetricReader(registryName);
}
+ public static <S extends MetricSnapshot> S getMetricSnapshot(
+ Class<S> snapshotClass, MetricSnapshots metrics, String name) {
+ return metrics.stream()
+ .filter(m -> m.getMetadata().getPrometheusName().equals(name))
+ .map(snapshotClass::cast)
+ .findFirst()
+ .get();
+ }
+
private static <T> T getDatapoint(
SolrCore core, String metricName, Labels labels, Class<T> snapshotType) {
var reader = getPrometheusMetricReader(core);