This is an automated email from the ASF dual-hosted git repository. mlbiscoc pushed a commit to branch feature/SOLR-17458-rebased in repository https://gitbox.apache.org/repos/asf/solr.git
commit 63f8fccf2599f1f637c66d116a66f92aaee9aadb Author: Jude Muriithi <[email protected]> AuthorDate: Wed Sep 17 16:16:33 2025 -0400 SOLR-17806: Migrate thread pool metrics to OTEL (#3519) * Stub out OtelInstrumentedeExecutorService * Finish OTEL ExecutorService implementation and replace MetricsUtil calls * Add OtelUnits to timers * spotlessApply * feedback * Use IOUtils.closeQuietly * Check in partial tests * Add pool metrics testing * use utility method in tests --------- Co-authored-by: jmuriithi3 <[email protected]> --- .../java/org/apache/solr/core/CoreContainer.java | 7 +- .../solr/handler/admin/CoreAdminHandler.java | 16 +- .../handler/component/HttpShardHandlerFactory.java | 6 +- .../org/apache/solr/update/UpdateShardHandler.java | 12 +- .../org/apache/solr/util/stats/MetricUtils.java | 23 +- .../stats/OtelInstrumentedExecutorService.java | 302 +++++++++++++++++++++ .../stats/OtelInstrumentedExecutorServiceTest.java | 238 ++++++++++++++++ .../org/apache/solr/util/SolrMetricTestUtils.java | 10 + 8 files changed, 572 insertions(+), 42 deletions(-) diff --git a/solr/core/src/java/org/apache/solr/core/CoreContainer.java b/solr/core/src/java/org/apache/solr/core/CoreContainer.java index 808ee59f6bb..2da7fc9ef91 100644 --- a/solr/core/src/java/org/apache/solr/core/CoreContainer.java +++ b/solr/core/src/java/org/apache/solr/core/CoreContainer.java @@ -1040,10 +1040,9 @@ public class CoreContainer { ExecutorUtil.newMDCAwareFixedThreadPool( cfg.getCoreLoadThreadCount(isZooKeeperAware()), new SolrNamedThreadFactory("coreLoadExecutor")), - null, - metricManager.registry(SolrMetricManager.getRegistryName(SolrInfoBean.Group.node)), - SolrMetricManager.mkName( - "coreLoadExecutor", SolrInfoBean.Category.CONTAINER.toString(), "threadPool")); + solrMetricsContext, + SolrInfoBean.Category.CONTAINER, + "coreLoadExecutor"); coreSorter = loader.newInstance( diff --git a/solr/core/src/java/org/apache/solr/handler/admin/CoreAdminHandler.java b/solr/core/src/java/org/apache/solr/handler/admin/CoreAdminHandler.java index b0237978593..41306919693 100644 --- a/solr/core/src/java/org/apache/solr/handler/admin/CoreAdminHandler.java +++ b/solr/core/src/java/org/apache/solr/handler/admin/CoreAdminHandler.java @@ -74,7 +74,6 @@ import org.apache.solr.handler.admin.api.SplitCoreAPI; import org.apache.solr.handler.admin.api.SwapCores; import org.apache.solr.handler.admin.api.UnloadCore; import org.apache.solr.logging.MDCLoggingContext; -import org.apache.solr.metrics.SolrMetricManager; import org.apache.solr.metrics.SolrMetricsContext; import org.apache.solr.request.SolrQueryRequest; import org.apache.solr.response.SolrQueryResponse; @@ -128,7 +127,6 @@ public class CoreAdminHandler extends RequestHandlerBase implements PermissionNa + "it is a special Handler configured directly by the RequestDispatcher"); } - // TODO SOLR-17458: Migrate to Otel @Override public void initializeMetrics( SolrMetricsContext parentContext, Attributes attributes, String scope) { @@ -136,18 +134,16 @@ public class CoreAdminHandler extends RequestHandlerBase implements PermissionNa coreAdminAsyncTracker.standardExecutor = MetricUtils.instrumentedExecutorService( coreAdminAsyncTracker.standardExecutor, - this, - solrMetricsContext.getMetricRegistry(), - SolrMetricManager.mkName( - "parallelCoreAdminExecutor", getCategory().name(), scope, "threadPool")); + solrMetricsContext, + getCategory(), + "parallelCoreAdminExecutor"); coreAdminAsyncTracker.expensiveExecutor = MetricUtils.instrumentedExecutorService( coreAdminAsyncTracker.expensiveExecutor, - this, - solrMetricsContext.getMetricRegistry(), - SolrMetricManager.mkName( - "parallelCoreExpensiveAdminExecutor", getCategory().name(), scope, "threadPool")); + solrMetricsContext, + getCategory(), + "parallelCoreExpensiveAdminExecutor"); } @Override diff --git a/solr/core/src/java/org/apache/solr/handler/component/HttpShardHandlerFactory.java b/solr/core/src/java/org/apache/solr/handler/component/HttpShardHandlerFactory.java index dd3630ea203..87c18486ba7 100644 --- a/solr/core/src/java/org/apache/solr/handler/component/HttpShardHandlerFactory.java +++ b/solr/core/src/java/org/apache/solr/handler/component/HttpShardHandlerFactory.java @@ -444,13 +444,9 @@ public class HttpShardHandlerFactory extends ShardHandlerFactory SolrMetricsContext parentContext, Attributes attributes, String scope) { solrMetricsContext = parentContext.getChildContext(this); String expandedScope = SolrMetricManager.mkName(scope, SolrInfoBean.Category.QUERY.name()); - // TODO SOLR-17458: Add Otel httpListenerFactory.initializeMetrics(solrMetricsContext, Attributes.empty(), expandedScope); commExecutor = MetricUtils.instrumentedExecutorService( - commExecutor, - null, - solrMetricsContext.getMetricRegistry(), - SolrMetricManager.mkName("httpShardExecutor", expandedScope, "threadPool")); + commExecutor, solrMetricsContext, SolrInfoBean.Category.QUERY, "httpShardExecutor"); } } diff --git a/solr/core/src/java/org/apache/solr/update/UpdateShardHandler.java b/solr/core/src/java/org/apache/solr/update/UpdateShardHandler.java index 157fc1513b0..c0d4715154a 100644 --- a/solr/core/src/java/org/apache/solr/update/UpdateShardHandler.java +++ b/solr/core/src/java/org/apache/solr/update/UpdateShardHandler.java @@ -163,26 +163,18 @@ public class UpdateShardHandler implements SolrInfoBean { return this.getClass().getName(); } - // TODO SOLR-17458: Add Otel @Override public void initializeMetrics( SolrMetricsContext parentContext, Attributes attributes, String scope) { solrMetricsContext = parentContext.getChildContext(this); String expandedScope = SolrMetricManager.mkName(scope, getCategory().name()); - // TODO SOLR-17458: Add Otel trackHttpSolrMetrics.initializeMetrics(solrMetricsContext, Attributes.empty(), expandedScope); updateExecutor = MetricUtils.instrumentedExecutorService( - updateExecutor, - this, - solrMetricsContext.getMetricRegistry(), - SolrMetricManager.mkName("updateOnlyExecutor", expandedScope, "threadPool")); + updateExecutor, solrMetricsContext, getCategory(), "updateOnlyExecutor"); recoveryExecutor = MetricUtils.instrumentedExecutorService( - recoveryExecutor, - this, - solrMetricsContext.getMetricRegistry(), - SolrMetricManager.mkName("recoveryExecutor", expandedScope, "threadPool")); + recoveryExecutor, solrMetricsContext, getCategory(), "recoveryExecutor"); } @Override diff --git a/solr/core/src/java/org/apache/solr/util/stats/MetricUtils.java b/solr/core/src/java/org/apache/solr/util/stats/MetricUtils.java index 9cff065c06f..3447e3e6f0a 100644 --- a/solr/core/src/java/org/apache/solr/util/stats/MetricUtils.java +++ b/solr/core/src/java/org/apache/solr/util/stats/MetricUtils.java @@ -19,7 +19,6 @@ package org.apache.solr.util.stats; import com.codahale.metrics.Counter; import com.codahale.metrics.Gauge; import com.codahale.metrics.Histogram; -import com.codahale.metrics.InstrumentedExecutorService; import com.codahale.metrics.Meter; import com.codahale.metrics.Metric; import com.codahale.metrics.MetricFilter; @@ -56,6 +55,7 @@ import org.apache.solr.common.util.NamedList; import org.apache.solr.core.SolrInfoBean; import org.apache.solr.metrics.AggregateMetric; import org.apache.solr.metrics.SolrMetricManager; +import org.apache.solr.metrics.SolrMetricsContext; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -744,18 +744,6 @@ public class MetricUtils { } } - /** Returns an instrumented wrapper over the given executor service. */ - public static ExecutorService instrumentedExecutorService( - ExecutorService delegate, SolrInfoBean info, MetricRegistry metricRegistry, String scope) { - if (info != null && info.getSolrMetricsContext() != null) { - info.getSolrMetricsContext().registerMetricName(MetricRegistry.name(scope, "submitted")); - info.getSolrMetricsContext().registerMetricName(MetricRegistry.name(scope, "running")); - info.getSolrMetricsContext().registerMetricName(MetricRegistry.name(scope, "completed")); - info.getSolrMetricsContext().registerMetricName(MetricRegistry.name(scope, "duration")); - } - return new InstrumentedExecutorService(delegate, metricRegistry, scope); - } - /** * Creates a set of metrics (gauges) that correspond to available bean properties for the provided * MXBean. @@ -829,6 +817,15 @@ public class MetricUtils { "com.ibm.lang.management.OperatingSystemMXBean" }; + /** Returns an instrumented wrapper over the given executor service. */ + public static ExecutorService instrumentedExecutorService( + ExecutorService delegate, + SolrMetricsContext ctx, + SolrInfoBean.Category category, + String name) { + return new OtelInstrumentedExecutorService(delegate, ctx, category, name); + } + /** * Creates a set of metrics (gauges) that correspond to available bean properties for the provided * MXBean. diff --git a/solr/core/src/java/org/apache/solr/util/stats/OtelInstrumentedExecutorService.java b/solr/core/src/java/org/apache/solr/util/stats/OtelInstrumentedExecutorService.java new file mode 100644 index 00000000000..2823826d4b5 --- /dev/null +++ b/solr/core/src/java/org/apache/solr/util/stats/OtelInstrumentedExecutorService.java @@ -0,0 +1,302 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.solr.util.stats; + +import static org.apache.solr.metrics.SolrMetricProducer.CATEGORY_ATTR; +import static org.apache.solr.metrics.SolrMetricProducer.TYPE_ATTR; + +import io.opentelemetry.api.common.AttributeKey; +import io.opentelemetry.api.common.Attributes; +import java.util.ArrayList; +import java.util.Collection; +import java.util.List; +import java.util.concurrent.BlockingQueue; +import java.util.concurrent.Callable; +import java.util.concurrent.ExecutionException; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.ForkJoinPool; +import java.util.concurrent.Future; +import java.util.concurrent.ThreadPoolExecutor; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.TimeoutException; +import org.apache.solr.common.util.IOUtils; +import org.apache.solr.core.SolrInfoBean; +import org.apache.solr.metrics.SolrMetricsContext; +import org.apache.solr.metrics.otel.OtelUnit; +import org.apache.solr.metrics.otel.instruments.AttributedLongCounter; +import org.apache.solr.metrics.otel.instruments.AttributedLongTimer; +import org.apache.solr.metrics.otel.instruments.AttributedLongTimer.MetricTimer; +import org.apache.solr.metrics.otel.instruments.AttributedLongUpDownCounter; + +/** + * OTEL instrumentation wrapper around {@link ExecutorService}. Based on {@link + * com.codahale.metrics.InstrumentedExecutorService}. + */ +public class OtelInstrumentedExecutorService implements ExecutorService { + public static final AttributeKey<String> EXECUTOR_NAME_ATTR = + AttributeKey.stringKey("executor_name"); + + private final ExecutorService delegate; + private final String executorName; + private final List<AutoCloseable> observableMetrics; + private final AttributedLongCounter submitted; + private final AttributedLongUpDownCounter running; + private final AttributedLongCounter completed; + private final AttributedLongTimer idle; + private final AttributedLongTimer duration; + + public OtelInstrumentedExecutorService( + ExecutorService delegate, + SolrMetricsContext ctx, + SolrInfoBean.Category category, + String executorName) { + this.delegate = delegate; + this.executorName = executorName; + this.observableMetrics = new ArrayList<>(); + + Attributes attrs = + Attributes.builder() + .put(CATEGORY_ATTR, category.toString()) + .put(EXECUTOR_NAME_ATTR, executorName) + .build(); + + // Each metric type needs a separate name to avoid obscuring other types + var executorTaskCounter = + ctx.longCounter("solr_executor_tasks", "Number of ExecutorService tasks"); + this.submitted = + new AttributedLongCounter( + executorTaskCounter, attrs.toBuilder().put(TYPE_ATTR, "submitted").build()); + this.completed = + new AttributedLongCounter( + executorTaskCounter, attrs.toBuilder().put(TYPE_ATTR, "completed").build()); + this.running = + new AttributedLongUpDownCounter( + ctx.longUpDownCounter( + "solr_executor_tasks_running", "Number of running ExecutorService tasks"), + attrs); + var executorTaskTimer = + ctx.longHistogram( + "solr_executor_task_times", "Timing of ExecutorService tasks", OtelUnit.MILLISECONDS); + this.idle = + new AttributedLongTimer( + executorTaskTimer, attrs.toBuilder().put(TYPE_ATTR, "idle").build()); + this.duration = + new AttributedLongTimer( + executorTaskTimer, attrs.toBuilder().put(TYPE_ATTR, "duration").build()); + + if (delegate instanceof ThreadPoolExecutor) { + ThreadPoolExecutor threadPool = (ThreadPoolExecutor) delegate; + observableMetrics.add( + ctx.observableLongGauge( + "solr_executor_thread_pool_size", + "Thread pool size", + measurement -> { + measurement.record( + threadPool.getPoolSize(), attrs.toBuilder().put(TYPE_ATTR, "size").build()); + measurement.record( + threadPool.getCorePoolSize(), attrs.toBuilder().put(TYPE_ATTR, "core").build()); + measurement.record( + threadPool.getMaximumPoolSize(), + attrs.toBuilder().put(TYPE_ATTR, "max").build()); + })); + + final BlockingQueue<Runnable> taskQueue = threadPool.getQueue(); + observableMetrics.add( + ctx.observableLongGauge( + "solr_executor_thread_pool_tasks", + "Thread pool task counts", + measurement -> { + measurement.record( + threadPool.getActiveCount(), + attrs.toBuilder().put(TYPE_ATTR, "active").build()); + measurement.record( + threadPool.getCompletedTaskCount(), + attrs.toBuilder().put(TYPE_ATTR, "completed").build()); + measurement.record( + taskQueue.size(), attrs.toBuilder().put(TYPE_ATTR, "queued").build()); + measurement.record( + taskQueue.remainingCapacity(), + attrs.toBuilder().put(TYPE_ATTR, "capacity").build()); + })); + } else if (delegate instanceof ForkJoinPool) { + ForkJoinPool forkJoinPool = (ForkJoinPool) delegate; + observableMetrics.add( + ctx.observableLongGauge( + "solr_executor_fork_join_pool_tasks", + "Fork join pool task counts", + measurement -> { + measurement.record( + forkJoinPool.getStealCount(), + attrs.toBuilder().put(TYPE_ATTR, "stolen").build()); + measurement.record( + forkJoinPool.getQueuedTaskCount(), + attrs.toBuilder().put(TYPE_ATTR, "queued").build()); + })); + observableMetrics.add( + ctx.observableLongGauge( + "solr_executor_fork_join_pool_threads", + "Fork join pool thread counts", + measurement -> { + measurement.record( + forkJoinPool.getActiveThreadCount(), + attrs.toBuilder().put(TYPE_ATTR, "active").build()); + measurement.record( + forkJoinPool.getRunningThreadCount(), + attrs.toBuilder().put(TYPE_ATTR, "running").build()); + })); + } + } + + @Override + public void execute(Runnable task) { + submitted.inc(); + delegate.execute(new InstrumentedRunnable(task)); + } + + @Override + public Future<?> submit(Runnable task) { + submitted.inc(); + return delegate.submit(new InstrumentedRunnable(task)); + } + + @Override + public <T> Future<T> submit(Runnable task, T result) { + submitted.inc(); + return delegate.submit(new InstrumentedRunnable(task), result); + } + + @Override + public <T> Future<T> submit(Callable<T> task) { + submitted.inc(); + return delegate.submit(new InstrumentedCallable<>(task)); + } + + @Override + public <T> List<Future<T>> invokeAll(Collection<? extends Callable<T>> tasks) + throws InterruptedException { + submitted.add(Long.valueOf(tasks.size())); + return delegate.invokeAll(instrument(tasks)); + } + + @Override + public <T> List<Future<T>> invokeAll( + Collection<? extends Callable<T>> tasks, long timeout, TimeUnit unit) + throws InterruptedException { + submitted.add(Long.valueOf(tasks.size())); + return delegate.invokeAll(instrument(tasks), timeout, unit); + } + + @Override + public <T> T invokeAny(Collection<? extends Callable<T>> tasks) + throws InterruptedException, ExecutionException { + submitted.add(Long.valueOf(tasks.size())); + return delegate.invokeAny(instrument(tasks)); + } + + @Override + public <T> T invokeAny(Collection<? extends Callable<T>> tasks, long timeout, TimeUnit unit) + throws InterruptedException, ExecutionException, TimeoutException { + submitted.add(Long.valueOf(tasks.size())); + return delegate.invokeAny(instrument(tasks), timeout, unit); + } + + @Override + public void shutdown() { + delegate.shutdown(); + observableMetrics.stream().forEach(IOUtils::closeQuietly); + } + + @Override + public List<Runnable> shutdownNow() { + List<Runnable> tasks = delegate.shutdownNow(); + observableMetrics.stream().forEach(IOUtils::closeQuietly); + return tasks; + } + + @Override + public boolean isShutdown() { + return delegate.isShutdown(); + } + + @Override + public boolean isTerminated() { + return delegate.isTerminated(); + } + + @Override + public boolean awaitTermination(long timeout, TimeUnit unit) throws InterruptedException { + return delegate.awaitTermination(timeout, unit); + } + + private <T> List<InstrumentedCallable<T>> instrument(Collection<? extends Callable<T>> tasks) { + List<InstrumentedCallable<T>> instrumented = new ArrayList<>(tasks.size()); + for (Callable<T> task : tasks) { + instrumented.add(new InstrumentedCallable<>(task)); + } + return instrumented; + } + + private class InstrumentedRunnable implements Runnable { + private final Runnable task; + private final MetricTimer idleTimer; + + InstrumentedRunnable(Runnable task) { + this.task = task; + this.idleTimer = idle.start(); + } + + @Override + public void run() { + idleTimer.stop(); + running.inc(); + + MetricTimer durationTimer = duration.start(); + try { + task.run(); + } finally { + durationTimer.stop(); + running.dec(); + completed.inc(); + } + } + } + + private class InstrumentedCallable<T> implements Callable<T> { + private final Callable<T> task; + private final MetricTimer idleTimer; + + InstrumentedCallable(Callable<T> task) { + this.task = task; + this.idleTimer = idle.start(); + } + + @Override + public T call() throws Exception { + idleTimer.stop(); + running.inc(); + + MetricTimer durationTimer = duration.start(); + try { + return task.call(); + } finally { + durationTimer.stop(); + running.dec(); + completed.inc(); + } + } + } +} diff --git a/solr/core/src/test/org/apache/solr/util/stats/OtelInstrumentedExecutorServiceTest.java b/solr/core/src/test/org/apache/solr/util/stats/OtelInstrumentedExecutorServiceTest.java new file mode 100644 index 00000000000..2d7ad162e4f --- /dev/null +++ b/solr/core/src/test/org/apache/solr/util/stats/OtelInstrumentedExecutorServiceTest.java @@ -0,0 +1,238 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.solr.util.stats; + +import static com.carrotsearch.randomizedtesting.RandomizedTest.randomIntBetween; +import static org.apache.solr.metrics.SolrMetricProducer.TYPE_ATTR; + +import io.prometheus.metrics.model.snapshots.CounterSnapshot; +import io.prometheus.metrics.model.snapshots.CounterSnapshot.CounterDataPointSnapshot; +import io.prometheus.metrics.model.snapshots.GaugeSnapshot; +import io.prometheus.metrics.model.snapshots.GaugeSnapshot.GaugeDataPointSnapshot; +import io.prometheus.metrics.model.snapshots.HistogramSnapshot; +import io.prometheus.metrics.model.snapshots.HistogramSnapshot.HistogramDataPointSnapshot; +import io.prometheus.metrics.model.snapshots.MetricSnapshots; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.Executors; +import java.util.concurrent.TimeUnit; +import org.apache.solr.SolrTestCase; +import org.apache.solr.core.SolrInfoBean; +import org.apache.solr.metrics.SolrMetricManager; +import org.apache.solr.metrics.SolrMetricsContext; +import org.apache.solr.util.SolrMetricTestUtils; +import org.junit.Before; +import org.junit.Test; + +public class OtelInstrumentedExecutorServiceTest extends SolrTestCase { + public static final int PARALLELISM = 10; + public static long EXEC_TIMEOUT = 1; + public static TimeUnit EXEC_TIMEOUT_UNITS = TimeUnit.SECONDS; + public static final String REGISTRY_NAME = "solr-test-otel-registry"; + public static final String TAG_NAME = "solr-test-otel-tag"; + public static final double DELTA = 1E-8; + + public static SolrMetricsContext metricsContext; + + @Before + public void setUpMetrics() { + metricsContext = new SolrMetricsContext(new SolrMetricManager(), REGISTRY_NAME, TAG_NAME); + } + + @Test + public void taskCount() throws InterruptedException { + try (var exec = testExecutor("taskCount", Executors.newFixedThreadPool(PARALLELISM))) { + final int numTasks = 225; + for (int i = 0; i < numTasks; ++i) { + exec.submit(() -> {}); + } + exec.awaitTermination(EXEC_TIMEOUT, EXEC_TIMEOUT_UNITS); + + MetricSnapshots metrics = + metricsContext.getMetricManager().getPrometheusMetricReader(REGISTRY_NAME).collect(); + GaugeSnapshot tasksRunning = + SolrMetricTestUtils.getMetricSnapshot( + GaugeSnapshot.class, metrics, "solr_executor_tasks_running"); + CounterSnapshot taskCounters = + SolrMetricTestUtils.getMetricSnapshot( + CounterSnapshot.class, metrics, "solr_executor_tasks"); + + GaugeDataPointSnapshot runningTasks = tasksRunning.getDataPoints().getFirst(); + CounterDataPointSnapshot submittedTasks = getCounterData(taskCounters, "submitted"); + CounterDataPointSnapshot completedTasks = getCounterData(taskCounters, "completed"); + + assertEquals(0.0, runningTasks.getValue(), DELTA); + assertEquals(numTasks, submittedTasks.getValue(), DELTA); + assertEquals(numTasks, completedTasks.getValue(), DELTA); + } + } + + @Test + public void taskRandomCount() throws InterruptedException { + try (var exec = testExecutor("taskRandomCount", Executors.newFixedThreadPool(PARALLELISM))) { + final int numTasks = randomIntBetween(1, 500); + for (int i = 0; i < numTasks; ++i) { + exec.submit(() -> {}); + } + exec.awaitTermination(EXEC_TIMEOUT, EXEC_TIMEOUT_UNITS); + + MetricSnapshots metrics = + metricsContext.getMetricManager().getPrometheusMetricReader(REGISTRY_NAME).collect(); + GaugeSnapshot tasksRunning = + SolrMetricTestUtils.getMetricSnapshot( + GaugeSnapshot.class, metrics, "solr_executor_tasks_running"); + CounterSnapshot taskCounters = + SolrMetricTestUtils.getMetricSnapshot( + CounterSnapshot.class, metrics, "solr_executor_tasks"); + + GaugeDataPointSnapshot runningTasks = tasksRunning.getDataPoints().getFirst(); + CounterDataPointSnapshot submittedTasks = getCounterData(taskCounters, "submitted"); + CounterDataPointSnapshot completedTasks = getCounterData(taskCounters, "completed"); + + assertEquals(0.0, runningTasks.getValue(), DELTA); + assertEquals(numTasks, submittedTasks.getValue(), DELTA); + assertEquals(numTasks, completedTasks.getValue(), DELTA); + } + } + + @Test + public void taskTimers() throws InterruptedException { + try (var exec = testExecutor("taskTimers", Executors.newFixedThreadPool(PARALLELISM))) { + final long durationMs = 200; + final double durationDeltaMs = 10.0; + exec.submit( + () -> { + try { + Thread.sleep(durationMs); + } catch (InterruptedException e) { + } + }); + exec.awaitTermination(EXEC_TIMEOUT, EXEC_TIMEOUT_UNITS); + + MetricSnapshots metrics = + metricsContext.getMetricManager().getPrometheusMetricReader(REGISTRY_NAME).collect(); + HistogramSnapshot taskTimers = + metrics.stream() + .filter( + m -> m.getMetadata().getPrometheusName().startsWith("solr_executor_task_times")) + .findFirst() + .map(HistogramSnapshot.class::cast) + .get(); + + HistogramDataPointSnapshot idleTimer = + taskTimers.getDataPoints().stream() + .filter(data -> data.getLabels().get(TYPE_ATTR.toString()).equals("idle")) + .findFirst() + .get(); + HistogramDataPointSnapshot durationTimer = + taskTimers.getDataPoints().stream() + .filter(data -> data.getLabels().get(TYPE_ATTR.toString()).equals("duration")) + .findFirst() + .get(); + + assertTrue(TimeUnit.SECONDS.toMillis(5) > idleTimer.getSum()); + assertEquals(durationMs, durationTimer.getSum(), durationDeltaMs); + } + } + + @Test + public void threadPoolTasks() throws InterruptedException { + try (var exec = testExecutor("threadPoolTasks", Executors.newFixedThreadPool(PARALLELISM))) { + final int numTasks = 225; + for (int i = 0; i < numTasks; ++i) { + exec.submit(() -> {}); + } + exec.awaitTermination(EXEC_TIMEOUT, EXEC_TIMEOUT_UNITS); + + MetricSnapshots metrics = + metricsContext.getMetricManager().getPrometheusMetricReader(REGISTRY_NAME).collect(); + GaugeSnapshot sizeGauges = + SolrMetricTestUtils.getMetricSnapshot( + GaugeSnapshot.class, metrics, "solr_executor_thread_pool_size"); + GaugeSnapshot taskGauges = + SolrMetricTestUtils.getMetricSnapshot( + GaugeSnapshot.class, metrics, "solr_executor_thread_pool_tasks"); + + GaugeDataPointSnapshot poolSize = getGaugeData(sizeGauges, "size"); + GaugeDataPointSnapshot corePoolSize = getGaugeData(sizeGauges, "core"); + GaugeDataPointSnapshot maxPoolSize = getGaugeData(sizeGauges, "max"); + + GaugeDataPointSnapshot activeTasks = getGaugeData(taskGauges, "active"); + GaugeDataPointSnapshot completedTasks = getGaugeData(taskGauges, "completed"); + GaugeDataPointSnapshot queuedTasks = getGaugeData(taskGauges, "queued"); + GaugeDataPointSnapshot poolCapacity = getGaugeData(taskGauges, "capacity"); + + assertEquals(PARALLELISM, poolSize.getValue(), DELTA); + assertEquals(PARALLELISM, corePoolSize.getValue(), DELTA); + assertEquals(PARALLELISM, maxPoolSize.getValue(), DELTA); + + assertEquals(0.0, activeTasks.getValue(), DELTA); + assertEquals(numTasks, completedTasks.getValue(), DELTA); + assertEquals(0.0, queuedTasks.getValue(), DELTA); + assertEquals(Integer.MAX_VALUE, poolCapacity.getValue(), DELTA); + } + } + + @Test + public void forkJoinPoolTasks() throws InterruptedException { + try (var exec = testExecutor("forkJoinPoolTasks", Executors.newWorkStealingPool(PARALLELISM))) { + final int numTasks = 225; + for (int i = 0; i < numTasks; ++i) { + exec.submit(() -> {}); + } + exec.awaitTermination(EXEC_TIMEOUT, EXEC_TIMEOUT_UNITS); + + MetricSnapshots metrics = + metricsContext.getMetricManager().getPrometheusMetricReader(REGISTRY_NAME).collect(); + GaugeSnapshot taskGauges = + SolrMetricTestUtils.getMetricSnapshot( + GaugeSnapshot.class, metrics, "solr_executor_fork_join_pool_tasks"); + GaugeSnapshot threadGauges = + SolrMetricTestUtils.getMetricSnapshot( + GaugeSnapshot.class, metrics, "solr_executor_fork_join_pool_threads"); + GaugeDataPointSnapshot stolenTasks = getGaugeData(taskGauges, "stolen"); + GaugeDataPointSnapshot queuedTasks = getGaugeData(taskGauges, "queued"); + + GaugeDataPointSnapshot activeThreads = getGaugeData(threadGauges, "active"); + GaugeDataPointSnapshot runningThreads = getGaugeData(threadGauges, "running"); + + assertNotNull(stolenTasks.getValue()); + assertEquals(0.0, queuedTasks.getValue(), DELTA); + + assertEquals(0.0, activeThreads.getValue(), DELTA); + assertEquals(0.0, runningThreads.getValue(), DELTA); + } + } + + private static ExecutorService testExecutor(String name, ExecutorService exec) { + return MetricUtils.instrumentedExecutorService( + exec, metricsContext, SolrInfoBean.Category.ADMIN, name); + } + + private static CounterDataPointSnapshot getCounterData(CounterSnapshot snapshot, String type) { + return snapshot.getDataPoints().stream() + .filter(data -> data.getLabels().get(TYPE_ATTR.toString()).equals(type)) + .findFirst() + .get(); + } + + private static GaugeDataPointSnapshot getGaugeData(GaugeSnapshot snapshot, String type) { + return snapshot.getDataPoints().stream() + .filter(data -> data.getLabels().get(TYPE_ATTR.toString()).equals(type)) + .findFirst() + .get(); + } +} diff --git a/solr/test-framework/src/java/org/apache/solr/util/SolrMetricTestUtils.java b/solr/test-framework/src/java/org/apache/solr/util/SolrMetricTestUtils.java index 8fcafc125c9..3531094cfc3 100644 --- a/solr/test-framework/src/java/org/apache/solr/util/SolrMetricTestUtils.java +++ b/solr/test-framework/src/java/org/apache/solr/util/SolrMetricTestUtils.java @@ -25,6 +25,7 @@ import io.prometheus.metrics.model.snapshots.DataPointSnapshot; import io.prometheus.metrics.model.snapshots.GaugeSnapshot; import io.prometheus.metrics.model.snapshots.HistogramSnapshot; import io.prometheus.metrics.model.snapshots.Labels; +import io.prometheus.metrics.model.snapshots.MetricSnapshot; import io.prometheus.metrics.model.snapshots.MetricSnapshots; import java.util.ArrayList; import java.util.HashMap; @@ -210,6 +211,15 @@ public final class SolrMetricTestUtils { return container.getMetricManager().getPrometheusMetricReader(registryName); } + public static <S extends MetricSnapshot> S getMetricSnapshot( + Class<S> snapshotClass, MetricSnapshots metrics, String name) { + return metrics.stream() + .filter(m -> m.getMetadata().getPrometheusName().equals(name)) + .map(snapshotClass::cast) + .findFirst() + .get(); + } + private static <T> T getDatapoint( SolrCore core, String metricName, Labels labels, Class<T> snapshotType) { var reader = getPrometheusMetricReader(core);
