This is an automated email from the ASF dual-hosted git repository.

mlbiscoc pushed a commit to branch feature/SOLR-17458-rebased
in repository https://gitbox.apache.org/repos/asf/solr.git

commit 63f8fccf2599f1f637c66d116a66f92aaee9aadb
Author: Jude Muriithi <[email protected]>
AuthorDate: Wed Sep 17 16:16:33 2025 -0400

    SOLR-17806: Migrate thread pool metrics to OTEL (#3519)
    
    * Stub out OtelInstrumentedeExecutorService
    
    * Finish OTEL ExecutorService implementation and replace MetricsUtil calls
    
    * Add OtelUnits to timers
    
    * spotlessApply
    
    * feedback
    
    * Use IOUtils.closeQuietly
    
    * Check in partial tests
    
    * Add pool metrics testing
    
    * use utility method in tests
    
    ---------
    
    Co-authored-by: jmuriithi3 <[email protected]>
---
 .../java/org/apache/solr/core/CoreContainer.java   |   7 +-
 .../solr/handler/admin/CoreAdminHandler.java       |  16 +-
 .../handler/component/HttpShardHandlerFactory.java |   6 +-
 .../org/apache/solr/update/UpdateShardHandler.java |  12 +-
 .../org/apache/solr/util/stats/MetricUtils.java    |  23 +-
 .../stats/OtelInstrumentedExecutorService.java     | 302 +++++++++++++++++++++
 .../stats/OtelInstrumentedExecutorServiceTest.java | 238 ++++++++++++++++
 .../org/apache/solr/util/SolrMetricTestUtils.java  |  10 +
 8 files changed, 572 insertions(+), 42 deletions(-)

diff --git a/solr/core/src/java/org/apache/solr/core/CoreContainer.java 
b/solr/core/src/java/org/apache/solr/core/CoreContainer.java
index 808ee59f6bb..2da7fc9ef91 100644
--- a/solr/core/src/java/org/apache/solr/core/CoreContainer.java
+++ b/solr/core/src/java/org/apache/solr/core/CoreContainer.java
@@ -1040,10 +1040,9 @@ public class CoreContainer {
             ExecutorUtil.newMDCAwareFixedThreadPool(
                 cfg.getCoreLoadThreadCount(isZooKeeperAware()),
                 new SolrNamedThreadFactory("coreLoadExecutor")),
-            null,
-            
metricManager.registry(SolrMetricManager.getRegistryName(SolrInfoBean.Group.node)),
-            SolrMetricManager.mkName(
-                "coreLoadExecutor", 
SolrInfoBean.Category.CONTAINER.toString(), "threadPool"));
+            solrMetricsContext,
+            SolrInfoBean.Category.CONTAINER,
+            "coreLoadExecutor");
 
     coreSorter =
         loader.newInstance(
diff --git 
a/solr/core/src/java/org/apache/solr/handler/admin/CoreAdminHandler.java 
b/solr/core/src/java/org/apache/solr/handler/admin/CoreAdminHandler.java
index b0237978593..41306919693 100644
--- a/solr/core/src/java/org/apache/solr/handler/admin/CoreAdminHandler.java
+++ b/solr/core/src/java/org/apache/solr/handler/admin/CoreAdminHandler.java
@@ -74,7 +74,6 @@ import org.apache.solr.handler.admin.api.SplitCoreAPI;
 import org.apache.solr.handler.admin.api.SwapCores;
 import org.apache.solr.handler.admin.api.UnloadCore;
 import org.apache.solr.logging.MDCLoggingContext;
-import org.apache.solr.metrics.SolrMetricManager;
 import org.apache.solr.metrics.SolrMetricsContext;
 import org.apache.solr.request.SolrQueryRequest;
 import org.apache.solr.response.SolrQueryResponse;
@@ -128,7 +127,6 @@ public class CoreAdminHandler extends RequestHandlerBase 
implements PermissionNa
             + "it is a special Handler configured directly by the 
RequestDispatcher");
   }
 
-  // TODO SOLR-17458: Migrate to Otel
   @Override
   public void initializeMetrics(
       SolrMetricsContext parentContext, Attributes attributes, String scope) {
@@ -136,18 +134,16 @@ public class CoreAdminHandler extends RequestHandlerBase 
implements PermissionNa
     coreAdminAsyncTracker.standardExecutor =
         MetricUtils.instrumentedExecutorService(
             coreAdminAsyncTracker.standardExecutor,
-            this,
-            solrMetricsContext.getMetricRegistry(),
-            SolrMetricManager.mkName(
-                "parallelCoreAdminExecutor", getCategory().name(), scope, 
"threadPool"));
+            solrMetricsContext,
+            getCategory(),
+            "parallelCoreAdminExecutor");
 
     coreAdminAsyncTracker.expensiveExecutor =
         MetricUtils.instrumentedExecutorService(
             coreAdminAsyncTracker.expensiveExecutor,
-            this,
-            solrMetricsContext.getMetricRegistry(),
-            SolrMetricManager.mkName(
-                "parallelCoreExpensiveAdminExecutor", getCategory().name(), 
scope, "threadPool"));
+            solrMetricsContext,
+            getCategory(),
+            "parallelCoreExpensiveAdminExecutor");
   }
 
   @Override
diff --git 
a/solr/core/src/java/org/apache/solr/handler/component/HttpShardHandlerFactory.java
 
b/solr/core/src/java/org/apache/solr/handler/component/HttpShardHandlerFactory.java
index dd3630ea203..87c18486ba7 100644
--- 
a/solr/core/src/java/org/apache/solr/handler/component/HttpShardHandlerFactory.java
+++ 
b/solr/core/src/java/org/apache/solr/handler/component/HttpShardHandlerFactory.java
@@ -444,13 +444,9 @@ public class HttpShardHandlerFactory extends 
ShardHandlerFactory
       SolrMetricsContext parentContext, Attributes attributes, String scope) {
     solrMetricsContext = parentContext.getChildContext(this);
     String expandedScope = SolrMetricManager.mkName(scope, 
SolrInfoBean.Category.QUERY.name());
-    // TODO SOLR-17458: Add Otel
     httpListenerFactory.initializeMetrics(solrMetricsContext, 
Attributes.empty(), expandedScope);
     commExecutor =
         MetricUtils.instrumentedExecutorService(
-            commExecutor,
-            null,
-            solrMetricsContext.getMetricRegistry(),
-            SolrMetricManager.mkName("httpShardExecutor", expandedScope, 
"threadPool"));
+            commExecutor, solrMetricsContext, SolrInfoBean.Category.QUERY, 
"httpShardExecutor");
   }
 }
diff --git a/solr/core/src/java/org/apache/solr/update/UpdateShardHandler.java 
b/solr/core/src/java/org/apache/solr/update/UpdateShardHandler.java
index 157fc1513b0..c0d4715154a 100644
--- a/solr/core/src/java/org/apache/solr/update/UpdateShardHandler.java
+++ b/solr/core/src/java/org/apache/solr/update/UpdateShardHandler.java
@@ -163,26 +163,18 @@ public class UpdateShardHandler implements SolrInfoBean {
     return this.getClass().getName();
   }
 
-  // TODO SOLR-17458: Add Otel
   @Override
   public void initializeMetrics(
       SolrMetricsContext parentContext, Attributes attributes, String scope) {
     solrMetricsContext = parentContext.getChildContext(this);
     String expandedScope = SolrMetricManager.mkName(scope, 
getCategory().name());
-    // TODO SOLR-17458: Add Otel
     trackHttpSolrMetrics.initializeMetrics(solrMetricsContext, 
Attributes.empty(), expandedScope);
     updateExecutor =
         MetricUtils.instrumentedExecutorService(
-            updateExecutor,
-            this,
-            solrMetricsContext.getMetricRegistry(),
-            SolrMetricManager.mkName("updateOnlyExecutor", expandedScope, 
"threadPool"));
+            updateExecutor, solrMetricsContext, getCategory(), 
"updateOnlyExecutor");
     recoveryExecutor =
         MetricUtils.instrumentedExecutorService(
-            recoveryExecutor,
-            this,
-            solrMetricsContext.getMetricRegistry(),
-            SolrMetricManager.mkName("recoveryExecutor", expandedScope, 
"threadPool"));
+            recoveryExecutor, solrMetricsContext, getCategory(), 
"recoveryExecutor");
   }
 
   @Override
diff --git a/solr/core/src/java/org/apache/solr/util/stats/MetricUtils.java 
b/solr/core/src/java/org/apache/solr/util/stats/MetricUtils.java
index 9cff065c06f..3447e3e6f0a 100644
--- a/solr/core/src/java/org/apache/solr/util/stats/MetricUtils.java
+++ b/solr/core/src/java/org/apache/solr/util/stats/MetricUtils.java
@@ -19,7 +19,6 @@ package org.apache.solr.util.stats;
 import com.codahale.metrics.Counter;
 import com.codahale.metrics.Gauge;
 import com.codahale.metrics.Histogram;
-import com.codahale.metrics.InstrumentedExecutorService;
 import com.codahale.metrics.Meter;
 import com.codahale.metrics.Metric;
 import com.codahale.metrics.MetricFilter;
@@ -56,6 +55,7 @@ import org.apache.solr.common.util.NamedList;
 import org.apache.solr.core.SolrInfoBean;
 import org.apache.solr.metrics.AggregateMetric;
 import org.apache.solr.metrics.SolrMetricManager;
+import org.apache.solr.metrics.SolrMetricsContext;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -744,18 +744,6 @@ public class MetricUtils {
     }
   }
 
-  /** Returns an instrumented wrapper over the given executor service. */
-  public static ExecutorService instrumentedExecutorService(
-      ExecutorService delegate, SolrInfoBean info, MetricRegistry 
metricRegistry, String scope) {
-    if (info != null && info.getSolrMetricsContext() != null) {
-      
info.getSolrMetricsContext().registerMetricName(MetricRegistry.name(scope, 
"submitted"));
-      
info.getSolrMetricsContext().registerMetricName(MetricRegistry.name(scope, 
"running"));
-      
info.getSolrMetricsContext().registerMetricName(MetricRegistry.name(scope, 
"completed"));
-      
info.getSolrMetricsContext().registerMetricName(MetricRegistry.name(scope, 
"duration"));
-    }
-    return new InstrumentedExecutorService(delegate, metricRegistry, scope);
-  }
-
   /**
    * Creates a set of metrics (gauges) that correspond to available bean 
properties for the provided
    * MXBean.
@@ -829,6 +817,15 @@ public class MetricUtils {
         "com.ibm.lang.management.OperatingSystemMXBean"
       };
 
+  /** Returns an instrumented wrapper over the given executor service. */
+  public static ExecutorService instrumentedExecutorService(
+      ExecutorService delegate,
+      SolrMetricsContext ctx,
+      SolrInfoBean.Category category,
+      String name) {
+    return new OtelInstrumentedExecutorService(delegate, ctx, category, name);
+  }
+
   /**
    * Creates a set of metrics (gauges) that correspond to available bean 
properties for the provided
    * MXBean.
diff --git 
a/solr/core/src/java/org/apache/solr/util/stats/OtelInstrumentedExecutorService.java
 
b/solr/core/src/java/org/apache/solr/util/stats/OtelInstrumentedExecutorService.java
new file mode 100644
index 00000000000..2823826d4b5
--- /dev/null
+++ 
b/solr/core/src/java/org/apache/solr/util/stats/OtelInstrumentedExecutorService.java
@@ -0,0 +1,302 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.solr.util.stats;
+
+import static org.apache.solr.metrics.SolrMetricProducer.CATEGORY_ATTR;
+import static org.apache.solr.metrics.SolrMetricProducer.TYPE_ATTR;
+
+import io.opentelemetry.api.common.AttributeKey;
+import io.opentelemetry.api.common.Attributes;
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.List;
+import java.util.concurrent.BlockingQueue;
+import java.util.concurrent.Callable;
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.ForkJoinPool;
+import java.util.concurrent.Future;
+import java.util.concurrent.ThreadPoolExecutor;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.TimeoutException;
+import org.apache.solr.common.util.IOUtils;
+import org.apache.solr.core.SolrInfoBean;
+import org.apache.solr.metrics.SolrMetricsContext;
+import org.apache.solr.metrics.otel.OtelUnit;
+import org.apache.solr.metrics.otel.instruments.AttributedLongCounter;
+import org.apache.solr.metrics.otel.instruments.AttributedLongTimer;
+import 
org.apache.solr.metrics.otel.instruments.AttributedLongTimer.MetricTimer;
+import org.apache.solr.metrics.otel.instruments.AttributedLongUpDownCounter;
+
+/**
+ * OTEL instrumentation wrapper around {@link ExecutorService}. Based on {@link
+ * com.codahale.metrics.InstrumentedExecutorService}.
+ */
+public class OtelInstrumentedExecutorService implements ExecutorService {
+  public static final AttributeKey<String> EXECUTOR_NAME_ATTR =
+      AttributeKey.stringKey("executor_name");
+
+  private final ExecutorService delegate;
+  private final String executorName;
+  private final List<AutoCloseable> observableMetrics;
+  private final AttributedLongCounter submitted;
+  private final AttributedLongUpDownCounter running;
+  private final AttributedLongCounter completed;
+  private final AttributedLongTimer idle;
+  private final AttributedLongTimer duration;
+
+  public OtelInstrumentedExecutorService(
+      ExecutorService delegate,
+      SolrMetricsContext ctx,
+      SolrInfoBean.Category category,
+      String executorName) {
+    this.delegate = delegate;
+    this.executorName = executorName;
+    this.observableMetrics = new ArrayList<>();
+
+    Attributes attrs =
+        Attributes.builder()
+            .put(CATEGORY_ATTR, category.toString())
+            .put(EXECUTOR_NAME_ATTR, executorName)
+            .build();
+
+    // Each metric type needs a separate name to avoid obscuring other types
+    var executorTaskCounter =
+        ctx.longCounter("solr_executor_tasks", "Number of ExecutorService 
tasks");
+    this.submitted =
+        new AttributedLongCounter(
+            executorTaskCounter, attrs.toBuilder().put(TYPE_ATTR, 
"submitted").build());
+    this.completed =
+        new AttributedLongCounter(
+            executorTaskCounter, attrs.toBuilder().put(TYPE_ATTR, 
"completed").build());
+    this.running =
+        new AttributedLongUpDownCounter(
+            ctx.longUpDownCounter(
+                "solr_executor_tasks_running", "Number of running 
ExecutorService tasks"),
+            attrs);
+    var executorTaskTimer =
+        ctx.longHistogram(
+            "solr_executor_task_times", "Timing of ExecutorService tasks", 
OtelUnit.MILLISECONDS);
+    this.idle =
+        new AttributedLongTimer(
+            executorTaskTimer, attrs.toBuilder().put(TYPE_ATTR, 
"idle").build());
+    this.duration =
+        new AttributedLongTimer(
+            executorTaskTimer, attrs.toBuilder().put(TYPE_ATTR, 
"duration").build());
+
+    if (delegate instanceof ThreadPoolExecutor) {
+      ThreadPoolExecutor threadPool = (ThreadPoolExecutor) delegate;
+      observableMetrics.add(
+          ctx.observableLongGauge(
+              "solr_executor_thread_pool_size",
+              "Thread pool size",
+              measurement -> {
+                measurement.record(
+                    threadPool.getPoolSize(), attrs.toBuilder().put(TYPE_ATTR, 
"size").build());
+                measurement.record(
+                    threadPool.getCorePoolSize(), 
attrs.toBuilder().put(TYPE_ATTR, "core").build());
+                measurement.record(
+                    threadPool.getMaximumPoolSize(),
+                    attrs.toBuilder().put(TYPE_ATTR, "max").build());
+              }));
+
+      final BlockingQueue<Runnable> taskQueue = threadPool.getQueue();
+      observableMetrics.add(
+          ctx.observableLongGauge(
+              "solr_executor_thread_pool_tasks",
+              "Thread pool task counts",
+              measurement -> {
+                measurement.record(
+                    threadPool.getActiveCount(),
+                    attrs.toBuilder().put(TYPE_ATTR, "active").build());
+                measurement.record(
+                    threadPool.getCompletedTaskCount(),
+                    attrs.toBuilder().put(TYPE_ATTR, "completed").build());
+                measurement.record(
+                    taskQueue.size(), attrs.toBuilder().put(TYPE_ATTR, 
"queued").build());
+                measurement.record(
+                    taskQueue.remainingCapacity(),
+                    attrs.toBuilder().put(TYPE_ATTR, "capacity").build());
+              }));
+    } else if (delegate instanceof ForkJoinPool) {
+      ForkJoinPool forkJoinPool = (ForkJoinPool) delegate;
+      observableMetrics.add(
+          ctx.observableLongGauge(
+              "solr_executor_fork_join_pool_tasks",
+              "Fork join pool task counts",
+              measurement -> {
+                measurement.record(
+                    forkJoinPool.getStealCount(),
+                    attrs.toBuilder().put(TYPE_ATTR, "stolen").build());
+                measurement.record(
+                    forkJoinPool.getQueuedTaskCount(),
+                    attrs.toBuilder().put(TYPE_ATTR, "queued").build());
+              }));
+      observableMetrics.add(
+          ctx.observableLongGauge(
+              "solr_executor_fork_join_pool_threads",
+              "Fork join pool thread counts",
+              measurement -> {
+                measurement.record(
+                    forkJoinPool.getActiveThreadCount(),
+                    attrs.toBuilder().put(TYPE_ATTR, "active").build());
+                measurement.record(
+                    forkJoinPool.getRunningThreadCount(),
+                    attrs.toBuilder().put(TYPE_ATTR, "running").build());
+              }));
+    }
+  }
+
+  @Override
+  public void execute(Runnable task) {
+    submitted.inc();
+    delegate.execute(new InstrumentedRunnable(task));
+  }
+
+  @Override
+  public Future<?> submit(Runnable task) {
+    submitted.inc();
+    return delegate.submit(new InstrumentedRunnable(task));
+  }
+
+  @Override
+  public <T> Future<T> submit(Runnable task, T result) {
+    submitted.inc();
+    return delegate.submit(new InstrumentedRunnable(task), result);
+  }
+
+  @Override
+  public <T> Future<T> submit(Callable<T> task) {
+    submitted.inc();
+    return delegate.submit(new InstrumentedCallable<>(task));
+  }
+
+  @Override
+  public <T> List<Future<T>> invokeAll(Collection<? extends Callable<T>> tasks)
+      throws InterruptedException {
+    submitted.add(Long.valueOf(tasks.size()));
+    return delegate.invokeAll(instrument(tasks));
+  }
+
+  @Override
+  public <T> List<Future<T>> invokeAll(
+      Collection<? extends Callable<T>> tasks, long timeout, TimeUnit unit)
+      throws InterruptedException {
+    submitted.add(Long.valueOf(tasks.size()));
+    return delegate.invokeAll(instrument(tasks), timeout, unit);
+  }
+
+  @Override
+  public <T> T invokeAny(Collection<? extends Callable<T>> tasks)
+      throws InterruptedException, ExecutionException {
+    submitted.add(Long.valueOf(tasks.size()));
+    return delegate.invokeAny(instrument(tasks));
+  }
+
+  @Override
+  public <T> T invokeAny(Collection<? extends Callable<T>> tasks, long 
timeout, TimeUnit unit)
+      throws InterruptedException, ExecutionException, TimeoutException {
+    submitted.add(Long.valueOf(tasks.size()));
+    return delegate.invokeAny(instrument(tasks), timeout, unit);
+  }
+
+  @Override
+  public void shutdown() {
+    delegate.shutdown();
+    observableMetrics.stream().forEach(IOUtils::closeQuietly);
+  }
+
+  @Override
+  public List<Runnable> shutdownNow() {
+    List<Runnable> tasks = delegate.shutdownNow();
+    observableMetrics.stream().forEach(IOUtils::closeQuietly);
+    return tasks;
+  }
+
+  @Override
+  public boolean isShutdown() {
+    return delegate.isShutdown();
+  }
+
+  @Override
+  public boolean isTerminated() {
+    return delegate.isTerminated();
+  }
+
+  @Override
+  public boolean awaitTermination(long timeout, TimeUnit unit) throws 
InterruptedException {
+    return delegate.awaitTermination(timeout, unit);
+  }
+
+  private <T> List<InstrumentedCallable<T>> instrument(Collection<? extends 
Callable<T>> tasks) {
+    List<InstrumentedCallable<T>> instrumented = new ArrayList<>(tasks.size());
+    for (Callable<T> task : tasks) {
+      instrumented.add(new InstrumentedCallable<>(task));
+    }
+    return instrumented;
+  }
+
+  private class InstrumentedRunnable implements Runnable {
+    private final Runnable task;
+    private final MetricTimer idleTimer;
+
+    InstrumentedRunnable(Runnable task) {
+      this.task = task;
+      this.idleTimer = idle.start();
+    }
+
+    @Override
+    public void run() {
+      idleTimer.stop();
+      running.inc();
+
+      MetricTimer durationTimer = duration.start();
+      try {
+        task.run();
+      } finally {
+        durationTimer.stop();
+        running.dec();
+        completed.inc();
+      }
+    }
+  }
+
+  private class InstrumentedCallable<T> implements Callable<T> {
+    private final Callable<T> task;
+    private final MetricTimer idleTimer;
+
+    InstrumentedCallable(Callable<T> task) {
+      this.task = task;
+      this.idleTimer = idle.start();
+    }
+
+    @Override
+    public T call() throws Exception {
+      idleTimer.stop();
+      running.inc();
+
+      MetricTimer durationTimer = duration.start();
+      try {
+        return task.call();
+      } finally {
+        durationTimer.stop();
+        running.dec();
+        completed.inc();
+      }
+    }
+  }
+}
diff --git 
a/solr/core/src/test/org/apache/solr/util/stats/OtelInstrumentedExecutorServiceTest.java
 
b/solr/core/src/test/org/apache/solr/util/stats/OtelInstrumentedExecutorServiceTest.java
new file mode 100644
index 00000000000..2d7ad162e4f
--- /dev/null
+++ 
b/solr/core/src/test/org/apache/solr/util/stats/OtelInstrumentedExecutorServiceTest.java
@@ -0,0 +1,238 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.solr.util.stats;
+
+import static 
com.carrotsearch.randomizedtesting.RandomizedTest.randomIntBetween;
+import static org.apache.solr.metrics.SolrMetricProducer.TYPE_ATTR;
+
+import io.prometheus.metrics.model.snapshots.CounterSnapshot;
+import 
io.prometheus.metrics.model.snapshots.CounterSnapshot.CounterDataPointSnapshot;
+import io.prometheus.metrics.model.snapshots.GaugeSnapshot;
+import 
io.prometheus.metrics.model.snapshots.GaugeSnapshot.GaugeDataPointSnapshot;
+import io.prometheus.metrics.model.snapshots.HistogramSnapshot;
+import 
io.prometheus.metrics.model.snapshots.HistogramSnapshot.HistogramDataPointSnapshot;
+import io.prometheus.metrics.model.snapshots.MetricSnapshots;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import java.util.concurrent.TimeUnit;
+import org.apache.solr.SolrTestCase;
+import org.apache.solr.core.SolrInfoBean;
+import org.apache.solr.metrics.SolrMetricManager;
+import org.apache.solr.metrics.SolrMetricsContext;
+import org.apache.solr.util.SolrMetricTestUtils;
+import org.junit.Before;
+import org.junit.Test;
+
+public class OtelInstrumentedExecutorServiceTest extends SolrTestCase {
+  public static final int PARALLELISM = 10;
+  public static long EXEC_TIMEOUT = 1;
+  public static TimeUnit EXEC_TIMEOUT_UNITS = TimeUnit.SECONDS;
+  public static final String REGISTRY_NAME = "solr-test-otel-registry";
+  public static final String TAG_NAME = "solr-test-otel-tag";
+  public static final double DELTA = 1E-8;
+
+  public static SolrMetricsContext metricsContext;
+
+  @Before
+  public void setUpMetrics() {
+    metricsContext = new SolrMetricsContext(new SolrMetricManager(), 
REGISTRY_NAME, TAG_NAME);
+  }
+
+  @Test
+  public void taskCount() throws InterruptedException {
+    try (var exec = testExecutor("taskCount", 
Executors.newFixedThreadPool(PARALLELISM))) {
+      final int numTasks = 225;
+      for (int i = 0; i < numTasks; ++i) {
+        exec.submit(() -> {});
+      }
+      exec.awaitTermination(EXEC_TIMEOUT, EXEC_TIMEOUT_UNITS);
+
+      MetricSnapshots metrics =
+          
metricsContext.getMetricManager().getPrometheusMetricReader(REGISTRY_NAME).collect();
+      GaugeSnapshot tasksRunning =
+          SolrMetricTestUtils.getMetricSnapshot(
+              GaugeSnapshot.class, metrics, "solr_executor_tasks_running");
+      CounterSnapshot taskCounters =
+          SolrMetricTestUtils.getMetricSnapshot(
+              CounterSnapshot.class, metrics, "solr_executor_tasks");
+
+      GaugeDataPointSnapshot runningTasks = 
tasksRunning.getDataPoints().getFirst();
+      CounterDataPointSnapshot submittedTasks = getCounterData(taskCounters, 
"submitted");
+      CounterDataPointSnapshot completedTasks = getCounterData(taskCounters, 
"completed");
+
+      assertEquals(0.0, runningTasks.getValue(), DELTA);
+      assertEquals(numTasks, submittedTasks.getValue(), DELTA);
+      assertEquals(numTasks, completedTasks.getValue(), DELTA);
+    }
+  }
+
+  @Test
+  public void taskRandomCount() throws InterruptedException {
+    try (var exec = testExecutor("taskRandomCount", 
Executors.newFixedThreadPool(PARALLELISM))) {
+      final int numTasks = randomIntBetween(1, 500);
+      for (int i = 0; i < numTasks; ++i) {
+        exec.submit(() -> {});
+      }
+      exec.awaitTermination(EXEC_TIMEOUT, EXEC_TIMEOUT_UNITS);
+
+      MetricSnapshots metrics =
+          
metricsContext.getMetricManager().getPrometheusMetricReader(REGISTRY_NAME).collect();
+      GaugeSnapshot tasksRunning =
+          SolrMetricTestUtils.getMetricSnapshot(
+              GaugeSnapshot.class, metrics, "solr_executor_tasks_running");
+      CounterSnapshot taskCounters =
+          SolrMetricTestUtils.getMetricSnapshot(
+              CounterSnapshot.class, metrics, "solr_executor_tasks");
+
+      GaugeDataPointSnapshot runningTasks = 
tasksRunning.getDataPoints().getFirst();
+      CounterDataPointSnapshot submittedTasks = getCounterData(taskCounters, 
"submitted");
+      CounterDataPointSnapshot completedTasks = getCounterData(taskCounters, 
"completed");
+
+      assertEquals(0.0, runningTasks.getValue(), DELTA);
+      assertEquals(numTasks, submittedTasks.getValue(), DELTA);
+      assertEquals(numTasks, completedTasks.getValue(), DELTA);
+    }
+  }
+
+  @Test
+  public void taskTimers() throws InterruptedException {
+    try (var exec = testExecutor("taskTimers", 
Executors.newFixedThreadPool(PARALLELISM))) {
+      final long durationMs = 200;
+      final double durationDeltaMs = 10.0;
+      exec.submit(
+          () -> {
+            try {
+              Thread.sleep(durationMs);
+            } catch (InterruptedException e) {
+            }
+          });
+      exec.awaitTermination(EXEC_TIMEOUT, EXEC_TIMEOUT_UNITS);
+
+      MetricSnapshots metrics =
+          
metricsContext.getMetricManager().getPrometheusMetricReader(REGISTRY_NAME).collect();
+      HistogramSnapshot taskTimers =
+          metrics.stream()
+              .filter(
+                  m -> 
m.getMetadata().getPrometheusName().startsWith("solr_executor_task_times"))
+              .findFirst()
+              .map(HistogramSnapshot.class::cast)
+              .get();
+
+      HistogramDataPointSnapshot idleTimer =
+          taskTimers.getDataPoints().stream()
+              .filter(data -> 
data.getLabels().get(TYPE_ATTR.toString()).equals("idle"))
+              .findFirst()
+              .get();
+      HistogramDataPointSnapshot durationTimer =
+          taskTimers.getDataPoints().stream()
+              .filter(data -> 
data.getLabels().get(TYPE_ATTR.toString()).equals("duration"))
+              .findFirst()
+              .get();
+
+      assertTrue(TimeUnit.SECONDS.toMillis(5) > idleTimer.getSum());
+      assertEquals(durationMs, durationTimer.getSum(), durationDeltaMs);
+    }
+  }
+
+  @Test
+  public void threadPoolTasks() throws InterruptedException {
+    try (var exec = testExecutor("threadPoolTasks", 
Executors.newFixedThreadPool(PARALLELISM))) {
+      final int numTasks = 225;
+      for (int i = 0; i < numTasks; ++i) {
+        exec.submit(() -> {});
+      }
+      exec.awaitTermination(EXEC_TIMEOUT, EXEC_TIMEOUT_UNITS);
+
+      MetricSnapshots metrics =
+          
metricsContext.getMetricManager().getPrometheusMetricReader(REGISTRY_NAME).collect();
+      GaugeSnapshot sizeGauges =
+          SolrMetricTestUtils.getMetricSnapshot(
+              GaugeSnapshot.class, metrics, "solr_executor_thread_pool_size");
+      GaugeSnapshot taskGauges =
+          SolrMetricTestUtils.getMetricSnapshot(
+              GaugeSnapshot.class, metrics, "solr_executor_thread_pool_tasks");
+
+      GaugeDataPointSnapshot poolSize = getGaugeData(sizeGauges, "size");
+      GaugeDataPointSnapshot corePoolSize = getGaugeData(sizeGauges, "core");
+      GaugeDataPointSnapshot maxPoolSize = getGaugeData(sizeGauges, "max");
+
+      GaugeDataPointSnapshot activeTasks = getGaugeData(taskGauges, "active");
+      GaugeDataPointSnapshot completedTasks = getGaugeData(taskGauges, 
"completed");
+      GaugeDataPointSnapshot queuedTasks = getGaugeData(taskGauges, "queued");
+      GaugeDataPointSnapshot poolCapacity = getGaugeData(taskGauges, 
"capacity");
+
+      assertEquals(PARALLELISM, poolSize.getValue(), DELTA);
+      assertEquals(PARALLELISM, corePoolSize.getValue(), DELTA);
+      assertEquals(PARALLELISM, maxPoolSize.getValue(), DELTA);
+
+      assertEquals(0.0, activeTasks.getValue(), DELTA);
+      assertEquals(numTasks, completedTasks.getValue(), DELTA);
+      assertEquals(0.0, queuedTasks.getValue(), DELTA);
+      assertEquals(Integer.MAX_VALUE, poolCapacity.getValue(), DELTA);
+    }
+  }
+
+  @Test
+  public void forkJoinPoolTasks() throws InterruptedException {
+    try (var exec = testExecutor("forkJoinPoolTasks", 
Executors.newWorkStealingPool(PARALLELISM))) {
+      final int numTasks = 225;
+      for (int i = 0; i < numTasks; ++i) {
+        exec.submit(() -> {});
+      }
+      exec.awaitTermination(EXEC_TIMEOUT, EXEC_TIMEOUT_UNITS);
+
+      MetricSnapshots metrics =
+          
metricsContext.getMetricManager().getPrometheusMetricReader(REGISTRY_NAME).collect();
+      GaugeSnapshot taskGauges =
+          SolrMetricTestUtils.getMetricSnapshot(
+              GaugeSnapshot.class, metrics, 
"solr_executor_fork_join_pool_tasks");
+      GaugeSnapshot threadGauges =
+          SolrMetricTestUtils.getMetricSnapshot(
+              GaugeSnapshot.class, metrics, 
"solr_executor_fork_join_pool_threads");
+      GaugeDataPointSnapshot stolenTasks = getGaugeData(taskGauges, "stolen");
+      GaugeDataPointSnapshot queuedTasks = getGaugeData(taskGauges, "queued");
+
+      GaugeDataPointSnapshot activeThreads = getGaugeData(threadGauges, 
"active");
+      GaugeDataPointSnapshot runningThreads = getGaugeData(threadGauges, 
"running");
+
+      assertNotNull(stolenTasks.getValue());
+      assertEquals(0.0, queuedTasks.getValue(), DELTA);
+
+      assertEquals(0.0, activeThreads.getValue(), DELTA);
+      assertEquals(0.0, runningThreads.getValue(), DELTA);
+    }
+  }
+
+  private static ExecutorService testExecutor(String name, ExecutorService 
exec) {
+    return MetricUtils.instrumentedExecutorService(
+        exec, metricsContext, SolrInfoBean.Category.ADMIN, name);
+  }
+
+  private static CounterDataPointSnapshot getCounterData(CounterSnapshot 
snapshot, String type) {
+    return snapshot.getDataPoints().stream()
+        .filter(data -> 
data.getLabels().get(TYPE_ATTR.toString()).equals(type))
+        .findFirst()
+        .get();
+  }
+
+  private static GaugeDataPointSnapshot getGaugeData(GaugeSnapshot snapshot, 
String type) {
+    return snapshot.getDataPoints().stream()
+        .filter(data -> 
data.getLabels().get(TYPE_ATTR.toString()).equals(type))
+        .findFirst()
+        .get();
+  }
+}
diff --git 
a/solr/test-framework/src/java/org/apache/solr/util/SolrMetricTestUtils.java 
b/solr/test-framework/src/java/org/apache/solr/util/SolrMetricTestUtils.java
index 8fcafc125c9..3531094cfc3 100644
--- a/solr/test-framework/src/java/org/apache/solr/util/SolrMetricTestUtils.java
+++ b/solr/test-framework/src/java/org/apache/solr/util/SolrMetricTestUtils.java
@@ -25,6 +25,7 @@ import 
io.prometheus.metrics.model.snapshots.DataPointSnapshot;
 import io.prometheus.metrics.model.snapshots.GaugeSnapshot;
 import io.prometheus.metrics.model.snapshots.HistogramSnapshot;
 import io.prometheus.metrics.model.snapshots.Labels;
+import io.prometheus.metrics.model.snapshots.MetricSnapshot;
 import io.prometheus.metrics.model.snapshots.MetricSnapshots;
 import java.util.ArrayList;
 import java.util.HashMap;
@@ -210,6 +211,15 @@ public final class SolrMetricTestUtils {
     return 
container.getMetricManager().getPrometheusMetricReader(registryName);
   }
 
+  public static <S extends MetricSnapshot> S getMetricSnapshot(
+      Class<S> snapshotClass, MetricSnapshots metrics, String name) {
+    return metrics.stream()
+        .filter(m -> m.getMetadata().getPrometheusName().equals(name))
+        .map(snapshotClass::cast)
+        .findFirst()
+        .get();
+  }
+
   private static <T> T getDatapoint(
       SolrCore core, String metricName, Labels labels, Class<T> snapshotType) {
     var reader = getPrometheusMetricReader(core);

Reply via email to