This is an automated email from the ASF dual-hosted git repository.
mkataria pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/jackrabbit-oak.git
The following commit(s) were added to refs/heads/trunk by this push:
new 16995af21f OAK-11716: Capture inference service stats (#2291)
16995af21f is described below
commit 16995af21f384a433286033e89ebc8934d19a1ba
Author: Mohit Kataria <[email protected]>
AuthorDate: Mon May 12 15:34:02 2025 +0530
OAK-11716: Capture inference service stats (#2291)
* OAK-11716: Capture inference service stats
* OAK-11716: Capture inference service stats
* OAK-11716: removed codahale dependency from pom
* OAK-11716: resolve sonarQube response
---
.../index/elastic/ElasticIndexProviderService.java | 2 +-
.../elastic/query/inference/InferenceConfig.java | 20 +-
.../elastic/query/inference/InferenceService.java | 2 +
.../query/inference/InferenceServiceManager.java | 16 +-
.../query/inference/InferenceServiceMetrics.java | 222 ++++++++++++++++++
.../inference/InferenceServiceUsingConfig.java | 25 +-
.../InferenceServiceUsingIndexConfig.java | 63 +++--
.../inference/ElasticInferenceUsingConfigTest.java | 224 ++++++++++++++----
.../inference/InferenceServiceMetricsTest.java | 256 +++++++++++++++++++++
9 files changed, 759 insertions(+), 71 deletions(-)
diff --git
a/oak-search-elastic/src/main/java/org/apache/jackrabbit/oak/plugins/index/elastic/ElasticIndexProviderService.java
b/oak-search-elastic/src/main/java/org/apache/jackrabbit/oak/plugins/index/elastic/ElasticIndexProviderService.java
index a53af22e10..f591d1e56a 100644
---
a/oak-search-elastic/src/main/java/org/apache/jackrabbit/oak/plugins/index/elastic/ElasticIndexProviderService.java
+++
b/oak-search-elastic/src/main/java/org/apache/jackrabbit/oak/plugins/index/elastic/ElasticIndexProviderService.java
@@ -190,7 +190,7 @@ public class ElasticIndexProviderService {
} else {
this.isInferenceEnabled = config.isInferenceEnabled();
}
- InferenceConfig.reInitialize(nodeStore, config.inferenceConfigPath(),
isInferenceEnabled);
+ InferenceConfig.reInitialize(nodeStore, statisticsProvider,
config.inferenceConfigPath(), isInferenceEnabled);
//initializeTextExtractionDir(bundleContext, config);
//initializeExtractedTextCache(config, statisticsProvider);
diff --git
a/oak-search-elastic/src/main/java/org/apache/jackrabbit/oak/plugins/index/elastic/query/inference/InferenceConfig.java
b/oak-search-elastic/src/main/java/org/apache/jackrabbit/oak/plugins/index/elastic/query/inference/InferenceConfig.java
index 167dea0f67..7d0ae473c2 100644
---
a/oak-search-elastic/src/main/java/org/apache/jackrabbit/oak/plugins/index/elastic/query/inference/InferenceConfig.java
+++
b/oak-search-elastic/src/main/java/org/apache/jackrabbit/oak/plugins/index/elastic/query/inference/InferenceConfig.java
@@ -25,6 +25,7 @@ import org.apache.jackrabbit.oak.json.JsonUtils;
import org.apache.jackrabbit.oak.plugins.index.IndexName;
import org.apache.jackrabbit.oak.spi.state.NodeState;
import org.apache.jackrabbit.oak.spi.state.NodeStore;
+import org.apache.jackrabbit.oak.stats.StatisticsProvider;
import org.jetbrains.annotations.NotNull;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -61,11 +62,16 @@ public class InferenceConfig {
private String currentInferenceConfig;
private volatile String activeInferenceConfig;
private boolean isInferenceEnabled;
+ private StatisticsProvider statisticsProvider;
public boolean isInferenceEnabled() {
return isInferenceEnabled;
}
+ public StatisticsProvider getStatisticsProvider() {
+ return statisticsProvider;
+ }
+
/**
* Loads configuration from the given NodeState
*/
@@ -79,24 +85,29 @@ public class InferenceConfig {
currentInferenceConfig = activeInferenceConfig;
isInferenceEnabled = false;
enricherStatus = EnricherStatus.NOOP;
+ statisticsProvider = StatisticsProvider.NOOP;
} finally {
lock.writeLock().unlock();
}
}
+ public static void reInitialize(NodeStore nodeStore, StatisticsProvider
statisticsProvider, String inferenceConfigPath, boolean isInferenceEnabled) {
+ reInitialize(nodeStore, statisticsProvider, inferenceConfigPath,
isInferenceEnabled, true);
+ }
+
public static void reInitialize(NodeStore nodeStore, String
inferenceConfigPath, boolean isInferenceEnabled) {
- reInitialize(nodeStore, inferenceConfigPath, isInferenceEnabled, true);
+ reInitialize(nodeStore, StatisticsProvider.NOOP, inferenceConfigPath,
isInferenceEnabled, true);
}
public static void reInitialize() {
- reInitialize(INSTANCE.nodeStore, INSTANCE.inferenceConfigPath,
INSTANCE.isInferenceEnabled, true);
+ reInitialize(INSTANCE.nodeStore, INSTANCE.statisticsProvider,
INSTANCE.inferenceConfigPath, INSTANCE.isInferenceEnabled, true);
}
public static InferenceConfig getInstance() {
lock.readLock().lock();
try {
if (INSTANCE.activeInferenceConfig != null &&
!INSTANCE.activeInferenceConfig.equals(INSTANCE.currentInferenceConfig)) {
- reInitialize(INSTANCE.nodeStore, INSTANCE.inferenceConfigPath,
INSTANCE.isInferenceEnabled, false);
+ reInitialize(INSTANCE.nodeStore, INSTANCE.statisticsProvider,
INSTANCE.inferenceConfigPath, INSTANCE.isInferenceEnabled, false);
}
return INSTANCE;
} finally {
@@ -104,7 +115,7 @@ public class InferenceConfig {
}
}
- private static void reInitialize(NodeStore nodeStore, String
inferenceConfigPath, boolean isInferenceEnabled, boolean
updateActiveInferenceConfig) {
+ private static void reInitialize(NodeStore nodeStore, StatisticsProvider
statisticsProvider, String inferenceConfigPath, boolean isInferenceEnabled,
boolean updateActiveInferenceConfig) {
lock.writeLock().lock();
try {
if (updateActiveInferenceConfig) {
@@ -115,6 +126,7 @@ public class InferenceConfig {
INSTANCE.inferenceConfigPath = inferenceConfigPath;
INSTANCE.isInferenceEnabled = isInferenceEnabled;
INSTANCE.enricherStatus = new EnricherStatus(nodeStore,
inferenceConfigPath);
+ INSTANCE.statisticsProvider = statisticsProvider;
if (!isValidInferenceConfig(nodeStore, inferenceConfigPath)) {
INSTANCE.enabled = false;
diff --git
a/oak-search-elastic/src/main/java/org/apache/jackrabbit/oak/plugins/index/elastic/query/inference/InferenceService.java
b/oak-search-elastic/src/main/java/org/apache/jackrabbit/oak/plugins/index/elastic/query/inference/InferenceService.java
index 3f1ace96a2..ff49fb8b12 100644
---
a/oak-search-elastic/src/main/java/org/apache/jackrabbit/oak/plugins/index/elastic/query/inference/InferenceService.java
+++
b/oak-search-elastic/src/main/java/org/apache/jackrabbit/oak/plugins/index/elastic/query/inference/InferenceService.java
@@ -21,6 +21,8 @@ package
org.apache.jackrabbit.oak.plugins.index.elastic.query.inference;
import java.util.List;
public interface InferenceService {
+ String DEFAULT_METRICS_LOGGING_INTERVAL_KEY =
"elastic.query.inference.LoggingInterval";
+ long DEFAULT_METRICS_LOGGING_INTERVAL =
Long.getLong(DEFAULT_METRICS_LOGGING_INTERVAL_KEY, 10L * 60L * 1000L); // 10
minutes
List<Float> embeddings(String text);
diff --git
a/oak-search-elastic/src/main/java/org/apache/jackrabbit/oak/plugins/index/elastic/query/inference/InferenceServiceManager.java
b/oak-search-elastic/src/main/java/org/apache/jackrabbit/oak/plugins/index/elastic/query/inference/InferenceServiceManager.java
index a5b2fe389f..adf971e90e 100644
---
a/oak-search-elastic/src/main/java/org/apache/jackrabbit/oak/plugins/index/elastic/query/inference/InferenceServiceManager.java
+++
b/oak-search-elastic/src/main/java/org/apache/jackrabbit/oak/plugins/index/elastic/query/inference/InferenceServiceManager.java
@@ -33,8 +33,10 @@ public class InferenceServiceManager {
private static final String CACHE_SIZE_PROPERTY =
"oak.inference.cache.size";
private static final int CACHE_SIZE =
SystemPropertySupplier.create(CACHE_SIZE_PROPERTY, 100).get();
+ private static final int UNCACHED_SERVICE_CACHE_SIZE = 0;
private static final ConcurrentHashMap<String, InferenceService> SERVICES
= new ConcurrentHashMap<>();
+ private static final InferenceServiceMetrics uncachedServiceMetrics = new
InferenceServiceMetrics(InferenceConfig.getInstance().getStatisticsProvider(),
"UNCACHED_SERVICE", UNCACHED_SERVICE_CACHE_SIZE);
@Deprecated
public static InferenceService getInstance(@NotNull String url, String
model) {
@@ -43,23 +45,25 @@ public class InferenceServiceManager {
if (SERVICES.size() >= MAX_CACHED_SERVICES) {
LOGGER.warning("InferenceServiceManager maximum cached services
reached: " + MAX_CACHED_SERVICES);
LOGGER.warning("Returning a new InferenceService instance with no
cache");
- return new InferenceServiceUsingIndexConfig(url, 0);
+ return new InferenceServiceUsingIndexConfig(url,
UNCACHED_SERVICE_CACHE_SIZE, uncachedServiceMetrics);
}
- return SERVICES.computeIfAbsent(k, key -> new
InferenceServiceUsingIndexConfig(url, CACHE_SIZE));
+ return SERVICES.computeIfAbsent(k, key -> new
InferenceServiceUsingIndexConfig(url, CACHE_SIZE,
+ new
InferenceServiceMetrics(InferenceConfig.getInstance().getStatisticsProvider(),
k, CACHE_SIZE)));
}
public static InferenceService getInstance(InferenceModelConfig
inferenceModelConfig) {
//TODO we should use hash here, as hash takes care of all properties
in model config.
String key = inferenceModelConfig.getEmbeddingServiceUrl()
- + "|" + inferenceModelConfig.getInferenceModelConfigName()
- + "|" + inferenceModelConfig.getModel();
+ + "|" + inferenceModelConfig.getInferenceModelConfigName()
+ + "|" + inferenceModelConfig.getModel();
if (SERVICES.size() >= MAX_CACHED_SERVICES) {
LOGGER.warning("InferenceServiceManager maximum cached services
reached: " + MAX_CACHED_SERVICES);
LOGGER.warning("Returning a new InferenceService instance with no
cache");
- return new InferenceServiceUsingConfig(inferenceModelConfig);
+ return new InferenceServiceUsingConfig(inferenceModelConfig,
uncachedServiceMetrics);
}
- return SERVICES.computeIfAbsent(key, k -> new
InferenceServiceUsingConfig(inferenceModelConfig));
+ return SERVICES.computeIfAbsent(key, k -> new
InferenceServiceUsingConfig(inferenceModelConfig,
+ new
InferenceServiceMetrics(InferenceConfig.getInstance().getStatisticsProvider(),
k, inferenceModelConfig.getCacheSize())));
}
}
diff --git
a/oak-search-elastic/src/main/java/org/apache/jackrabbit/oak/plugins/index/elastic/query/inference/InferenceServiceMetrics.java
b/oak-search-elastic/src/main/java/org/apache/jackrabbit/oak/plugins/index/elastic/query/inference/InferenceServiceMetrics.java
new file mode 100644
index 0000000000..92cccbe085
--- /dev/null
+++
b/oak-search-elastic/src/main/java/org/apache/jackrabbit/oak/plugins/index/elastic/query/inference/InferenceServiceMetrics.java
@@ -0,0 +1,222 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.jackrabbit.oak.plugins.index.elastic.query.inference;
+
+import org.apache.jackrabbit.oak.stats.MeterStats;
+import org.apache.jackrabbit.oak.stats.StatisticsProvider;
+import org.apache.jackrabbit.oak.stats.StatsOptions;
+import org.apache.jackrabbit.oak.stats.TimerStats;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.LinkedHashMap;
+import java.util.Map;
+import java.util.concurrent.TimeUnit;
+
+/**
+ * Collects and reports metrics for the inference service.
+ */
+public class InferenceServiceMetrics {
+ final static Logger LOG =
LoggerFactory.getLogger(InferenceServiceMetrics.class);
+
+ // Tracks the last time metrics were logged
+ private long lastLogTimeMillis;
+ private String metricsServiceKey;
+ private int cacheSize;
+ private StatisticsProvider statisticsProvider;
+
+ // Metric constants for both output property names and registry base names
+ public static final String TOTAL_REQUESTS = "INFERENCE_TOTAL_REQUESTS";
+ public static final String CACHE_HITS = "INFERENCE_CACHE_HITS";
+ public static final String CACHE_MISSES = "INFERENCE_CACHE_MISSES";
+ public static final String INFERENCE_CACHE_HIT_RATE =
"INFERENCE_CACHE_HIT_RATE";
+ public static final String INFERENCE_CACHE_SIZE = "INFERENCE_CACHE_SIZE";
+ public static final String INFERENCE_REQUEST_ERRORS =
"INFERENCE_REQUEST_ERRORS";
+ public static final String INFERENCE_ERROR_RATE = "INFERENCE_ERROR_RATE";
+ public static final String INFERENCE_REQUEST_TIMES =
"INFERENCE_REQUEST_TIMES";
+ public static final String INFERENCE_ERROR_TIMES = "INFERENCE_ERROR_TIMES";
+
+ /**
+ * Creates a new InferenceServiceMetrics instance
+ *
+ * @param statisticsProvider The statistics provider to use
+ * @param metricsServiceKey The service key for logging
+ * @param cacheSize The configured cache size
+ */
+ public InferenceServiceMetrics(StatisticsProvider statisticsProvider,
String metricsServiceKey, int cacheSize) {
+ this.lastLogTimeMillis = System.currentTimeMillis();
+ this.metricsServiceKey = metricsServiceKey;
+ this.cacheSize = cacheSize;
+ this.statisticsProvider = statisticsProvider;
+ }
+
+ /**
+ * Records a request start and returns a TimerStats.Context
+ *
+ * @return a TimerStats.Context that should be stopped when the request
completes
+ */
+ public TimerStats.Context requestStarted() {
+ getMeter(TOTAL_REQUESTS).mark();
+ // Start timing
+ return getTimer(INFERENCE_REQUEST_TIMES).time();
+ }
+
+ /**
+ * Records a cache hit
+ */
+ public void cacheHit() {
+ getMeter(CACHE_HITS).mark();
+ }
+
+ /**
+ * Records a cache miss
+ */
+ public void cacheMiss() {
+ getMeter(CACHE_MISSES).mark();
+ }
+
+ /**
+ * Records a request error
+ *
+ * @param timeMillis Time taken before the error occurred in milliseconds
+ * @param timerContext Timer context to stop, if available (can be null)
+ */
+ public void requestError(long timeMillis, TimerStats.Context timerContext)
{
+ getMeter(INFERENCE_REQUEST_ERRORS).mark();
+ // Stop the timer context if provided
+ if (timerContext != null) {
+ timerContext.stop();
+ }
+ LOG.debug("Request error occurred after {} ms for {}", timeMillis,
metricsServiceKey);
+ }
+
+ /**
+ * Records a request error without timing information
+ */
+ public void requestError() {
+ getMeter(INFERENCE_REQUEST_ERRORS).mark();
+ LOG.debug("Request error occurred (timing unknown) for {}",
metricsServiceKey);
+ }
+
+ /**
+ * Records a request completion
+ *
+ * @param timeMillis Time taken to complete the request in milliseconds
+ * @param timerContext Timer context to stop, if available (can be null)
+ */
+ public void requestCompleted(long timeMillis, TimerStats.Context
timerContext) {
+ // Stop timer context if provided
+ if (timerContext != null) {
+ timerContext.stop();
+ } else {
+ // If no context was provided, update the timer directly
+ getTimer(INFERENCE_REQUEST_TIMES).update(timeMillis,
TimeUnit.MILLISECONDS);
+ }
+ LOG.debug("Request completed in {} ms for {}", timeMillis,
metricsServiceKey);
+ }
+
+ /**
+ * Returns the cache hit rate percentage (0-100)
+ *
+ * @return The cache hit rate as a percentage
+ */
+ public double getCacheHitRate() {
+ long hits = getMeter(CACHE_HITS).getCount();
+ long misses = getMeter(CACHE_MISSES).getCount();
+ long total = hits + misses;
+ return total > 0 ? (hits * 100.0 / total) : 0.0;
+ }
+
+ /**
+ * Returns metrics as a map for monitoring
+ *
+ * @return A map of metric names to values
+ */
+ public Map<String, Object> getMetrics() {
+ Map<String, Object> metricsMap = new LinkedHashMap<>();
+
+ // Get base metrics
+ long total = getMeter(TOTAL_REQUESTS).getCount();
+ long hits = getMeter(CACHE_HITS).getCount();
+ long misses = getMeter(CACHE_MISSES).getCount();
+ long errors = getMeter(INFERENCE_REQUEST_ERRORS).getCount();
+ // Add to map
+ metricsMap.put("totalRequests", total);
+ metricsMap.put("cacheHits", hits);
+ metricsMap.put("cacheMisses", misses);
+ metricsMap.put("requestErrors", errors);
+ double hitRate = total > 0 ? (hits * 100.0 / total) : 0.0;
+ metricsMap.put(INFERENCE_CACHE_HIT_RATE, hitRate);
+ metricsMap.put(INFERENCE_CACHE_SIZE, cacheSize);
+ metricsMap.put(INFERENCE_ERROR_RATE, total > 0 ? (errors * 100.0 /
total) : 0.0);
+ return metricsMap;
+ }
+
+ /**
+ * Logs a summary of the current metrics
+ */
+ public void logMetricsSummary() {
+ logMetricsSummary(0);
+ }
+
+ /**
+ * Logs a summary of the current metrics if the interval has passed
+ *
+ * @param intervalMillis Minimum interval between logs in milliseconds
+ */
+ public void logMetricsSummary(long intervalMillis) {
+ // Skip if interval has not passed
+ if (lastLogTimeMillis + intervalMillis > System.currentTimeMillis()) {
+ return;
+ }
+ // Get metrics
+ Map<String, Object> metrics = getMetrics();
+ // Build log message
+ StringBuilder logMessage = new StringBuilder();
+ logMessage.append("Inference service metrics");
+ if (metricsServiceKey != null) {
+ logMessage.append(" for ServiceKey
'").append(metricsServiceKey).append("'");
+ }
+ logMessage.append(": requests=").append(metrics.get("totalRequests"))
+ .append(",
hitRate=").append(metrics.get(INFERENCE_CACHE_HIT_RATE)).append("%")
+ .append(",
errorRate=").append(metrics.get(INFERENCE_ERROR_RATE)).append("%");
+ // Log the message
+ lastLogTimeMillis = System.currentTimeMillis();
+ LOG.info(logMessage.toString());
+ }
+
+ private MeterStats getMeter(String name) {
+ return
statisticsProvider.getMeter(getMetricName(this.metricsServiceKey + ";" + name),
StatsOptions.DEFAULT);
+ }
+
+ private TimerStats getTimer(String name) {
+ return statisticsProvider.getTimer(this.metricsServiceKey + ";" +
getMetricName(name), StatsOptions.DEFAULT);
+ }
+
+ /**
+ * Returns the metric name to use with the StatisticsProvider.
+ * This method can be overridden by tests to provide unique metric names.
+ *
+ * @param baseName The base metric name
+ * @return The actual metric name to use
+ */
+ protected String getMetricName(String baseName) {
+ return baseName;
+ }
+}
\ No newline at end of file
diff --git
a/oak-search-elastic/src/main/java/org/apache/jackrabbit/oak/plugins/index/elastic/query/inference/InferenceServiceUsingConfig.java
b/oak-search-elastic/src/main/java/org/apache/jackrabbit/oak/plugins/index/elastic/query/inference/InferenceServiceUsingConfig.java
index 7816a85ce8..adc418fd3a 100644
---
a/oak-search-elastic/src/main/java/org/apache/jackrabbit/oak/plugins/index/elastic/query/inference/InferenceServiceUsingConfig.java
+++
b/oak-search-elastic/src/main/java/org/apache/jackrabbit/oak/plugins/index/elastic/query/inference/InferenceServiceUsingConfig.java
@@ -18,9 +18,8 @@
*/
package org.apache.jackrabbit.oak.plugins.index.elastic.query.inference;
-import com.fasterxml.jackson.core.JsonProcessingException;
-import com.fasterxml.jackson.databind.JsonMappingException;
import com.fasterxml.jackson.databind.ObjectMapper;
+import org.apache.jackrabbit.oak.stats.TimerStats;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -53,9 +52,9 @@ public class InferenceServiceUsingConfig implements
InferenceService {
private final long timeoutMillis;
private final InferenceModelConfig inferenceModelConfig;
private final String[] headersValue;
+ private final InferenceServiceMetrics metrics;
-
- public InferenceServiceUsingConfig(InferenceModelConfig
inferenceModelConfig) {
+ public InferenceServiceUsingConfig(InferenceModelConfig
inferenceModelConfig, InferenceServiceMetrics metrics) {
try {
this.uri = new URI(inferenceModelConfig.getEmbeddingServiceUrl());
} catch (URISyntaxException e) {
@@ -65,6 +64,7 @@ public class InferenceServiceUsingConfig implements
InferenceService {
this.httpClient = HttpClient.newHttpClient();
this.timeoutMillis = inferenceModelConfig.getTimeoutMillis();
this.inferenceModelConfig = inferenceModelConfig;
+ this.metrics = metrics;
this.headersValue =
inferenceModelConfig.getHeader().getInferenceHeaderPayload()
.entrySet().stream()
.flatMap(e -> Stream.of(e.getKey(), e.getValue()))
@@ -78,10 +78,17 @@ public class InferenceServiceUsingConfig implements
InferenceService {
}
public List<Float> embeddings(String text, long timeoutMillis) {
+ // Track the request
+ TimerStats.Context timerContext = metrics.requestStarted();
+ long startTime = System.currentTimeMillis();
+
if (cache.containsKey(text)) {
+ metrics.cacheHit();
+ metrics.requestCompleted(System.currentTimeMillis() - startTime,
timerContext);
return cache.get(text);
}
+ metrics.cacheMiss();
List<Float> result = null;
try {
// Create the JSON payload.
@@ -108,13 +115,23 @@ public class InferenceServiceUsingConfig implements
InferenceService {
});
result = embeddingList;
cache.put(text, result);
+ metrics.requestCompleted(System.currentTimeMillis() -
startTime, timerContext);
return result;
+ } else {
+ metrics.requestError(System.currentTimeMillis() - startTime,
timerContext);
+ LOG.error("Failed to get embeddings. Status code: {},
Response: {}", response.statusCode(), response.body());
}
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
+ metrics.requestError(System.currentTimeMillis() - startTime,
timerContext);
throw new InferenceServiceException("Failed to get embeddings", e);
} catch (IOException e) {
+ metrics.requestError(System.currentTimeMillis() - startTime,
timerContext);
throw new InferenceServiceException("Unable to extract embeddings
from inference service response", e);
+ } finally {
+ //TODO evaluate and update how often we want to log these stats.
+ // Setting it to log every 10 minutes by default and can be
configured using system property.
+ metrics.logMetricsSummary(DEFAULT_METRICS_LOGGING_INTERVAL);
}
return result;
}
diff --git
a/oak-search-elastic/src/main/java/org/apache/jackrabbit/oak/plugins/index/elastic/query/inference/InferenceServiceUsingIndexConfig.java
b/oak-search-elastic/src/main/java/org/apache/jackrabbit/oak/plugins/index/elastic/query/inference/InferenceServiceUsingIndexConfig.java
index 03badd2b8a..48f2045748 100644
---
a/oak-search-elastic/src/main/java/org/apache/jackrabbit/oak/plugins/index/elastic/query/inference/InferenceServiceUsingIndexConfig.java
+++
b/oak-search-elastic/src/main/java/org/apache/jackrabbit/oak/plugins/index/elastic/query/inference/InferenceServiceUsingIndexConfig.java
@@ -20,6 +20,9 @@ package
org.apache.jackrabbit.oak.plugins.index.elastic.query.inference;
import com.fasterxml.jackson.databind.JsonNode;
import com.fasterxml.jackson.databind.ObjectMapper;
+import org.apache.jackrabbit.oak.stats.TimerStats;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
import java.io.IOException;
import java.net.URI;
@@ -38,15 +41,17 @@ import java.util.stream.Collectors;
* EXPERIMENTAL: A service that sends text to an inference service and
receives embeddings in return.
* The embeddings are cached to avoid repeated calls to the inference service.
*/
-public class InferenceServiceUsingIndexConfig implements InferenceService{
+public class InferenceServiceUsingIndexConfig implements InferenceService {
+ private static final Logger LOG =
LoggerFactory.getLogger(InferenceServiceUsingIndexConfig.class);
private static final ObjectMapper MAPPER = new ObjectMapper();
private final URI uri;
private final Cache<String, List<Float>> cache;
private final HttpClient httpClient;
+ private final InferenceServiceMetrics metrics;
- public InferenceServiceUsingIndexConfig(String url, int cacheSize) {
+ public InferenceServiceUsingIndexConfig(String url, int cacheSize,
InferenceServiceMetrics metrics) {
try {
this.uri = new URI(url);
} catch (URISyntaxException e) {
@@ -54,6 +59,7 @@ public class InferenceServiceUsingIndexConfig implements
InferenceService{
}
this.cache = new Cache<>(cacheSize);
this.httpClient = HttpClient.newHttpClient();
+ this.metrics = metrics;
}
@Override
@@ -62,49 +68,80 @@ public class InferenceServiceUsingIndexConfig implements
InferenceService{
}
public List<Float> embeddings(String text, long timeoutMillis) {
- if (cache.containsKey(text)) {
- return cache.get(text);
- }
+ TimerStats.Context timerContext = metrics.requestStarted();
+ long startTime = System.currentTimeMillis();
try {
+ if (cache.containsKey(text)) {
+ metrics.cacheHit();
+ return cache.get(text);
+ }
+
+ metrics.cacheMiss();
+
// Create the JSON payload.
String jsonInputString = "{\"text\":\"" + text + "\"}";
// Build the HttpRequest.
HttpRequest request = HttpRequest.newBuilder()
- .uri(uri)
- .timeout(java.time.Duration.ofMillis(timeoutMillis))
- .header("Content-Type", "application/json; utf-8")
- .POST(HttpRequest.BodyPublishers.ofString(jsonInputString,
StandardCharsets.UTF_8))
- .build();
+ .uri(uri)
+ .timeout(java.time.Duration.ofMillis(timeoutMillis))
+ .header("Content-Type", "application/json; utf-8")
+ .POST(HttpRequest.BodyPublishers.ofString(jsonInputString,
StandardCharsets.UTF_8))
+ .build();
// Send the request and get the response.
+ LOG.debug("Sending request to inference service: {}", uri);
HttpResponse<String> response = httpClient.send(request,
HttpResponse.BodyHandlers.ofString());
+ // Check response status
+ if (response.statusCode() != 200) {
+ metrics.requestError();
+ LOG.warn("Inference service returned non-200 status code: {} -
{}", response.statusCode(), response.body());
+ throw new InferenceServiceException("Inference service
returned status code: " + response.statusCode());
+ }
+
// Parse the response string into a JsonNode.
JsonNode jsonResponse = MAPPER.readTree(response.body());
// Extract the 'embedding' property.
JsonNode embedding = jsonResponse.get("embedding");
+ if (embedding == null) {
+ metrics.requestError();
+ LOG.warn("Inference service response did not contain
'embedding' property: {}", response.body());
+ throw new InferenceServiceException("Invalid response from
inference service: missing 'embedding' property");
+ }
+
double[] embeddings = MAPPER.treeToValue(embedding,
double[].class);
// Convert the array of doubles to a list of floats.
List<Float> result = Arrays.stream(embeddings)
- .mapToObj(d -> ((Double) d).floatValue())
- .collect(Collectors.toList());
+ .mapToObj(d -> ((Double) d).floatValue())
+ .collect(Collectors.toList());
cache.put(text, result);
+
+ LOG.debug("Successfully retrieved embeddings for text of length
{}", text.length());
return result;
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
+ metrics.requestError();
+ LOG.warn("Inference service request was interrupted", e);
throw new InferenceServiceException("Failed to get embeddings", e);
} catch (IOException e) {
+ metrics.requestError();
+ LOG.warn("Error communicating with inference service", e);
throw new InferenceServiceException("Unable to extract embeddings
from inference service response", e);
+ } finally {
+ long requestTime = System.currentTimeMillis() - startTime;
+ metrics.requestCompleted(requestTime, timerContext);
+ //TODO evaluate and update how often we want to log these stats.
+ // Setting it to log every 10 minutes by default and can be
configured using system property.
+ metrics.logMetricsSummary(DEFAULT_METRICS_LOGGING_INTERVAL);
}
}
-
private static class Cache<K, V> extends LinkedHashMap<K, V> {
private final int maxEntries;
diff --git
a/oak-search-elastic/src/test/java/org/apache/jackrabbit/oak/plugins/index/elastic/query/inference/ElasticInferenceUsingConfigTest.java
b/oak-search-elastic/src/test/java/org/apache/jackrabbit/oak/plugins/index/elastic/query/inference/ElasticInferenceUsingConfigTest.java
index 06f5ab4beb..5df1eee832 100644
---
a/oak-search-elastic/src/test/java/org/apache/jackrabbit/oak/plugins/index/elastic/query/inference/ElasticInferenceUsingConfigTest.java
+++
b/oak-search-elastic/src/test/java/org/apache/jackrabbit/oak/plugins/index/elastic/query/inference/ElasticInferenceUsingConfigTest.java
@@ -16,6 +16,7 @@
*/
package org.apache.jackrabbit.oak.plugins.index.elastic.query.inference;
+import ch.qos.logback.classic.Level;
import co.elastic.clients.elasticsearch.indices.get_mapping.IndexMappingRecord;
import co.elastic.clients.json.JsonData;
import com.fasterxml.jackson.core.JsonProcessingException;
@@ -33,13 +34,22 @@ import org.apache.jackrabbit.oak.api.CommitFailedException;
import org.apache.jackrabbit.oak.api.Tree;
import org.apache.jackrabbit.oak.api.Type;
import org.apache.jackrabbit.oak.commons.PathUtils;
+import org.apache.jackrabbit.oak.commons.junit.LogCustomizer;
import
org.apache.jackrabbit.oak.plugins.index.elastic.ElasticAbstractQueryTest;
import
org.apache.jackrabbit.oak.plugins.index.search.util.IndexDefinitionBuilder;
import org.apache.jackrabbit.oak.spi.commit.CommitInfo;
import org.apache.jackrabbit.oak.spi.commit.EmptyHook;
import org.apache.jackrabbit.oak.spi.state.NodeBuilder;
+import org.apache.jackrabbit.oak.stats.CounterStats;
+import org.apache.jackrabbit.oak.stats.DefaultStatisticsProvider;
+import org.apache.jackrabbit.oak.stats.StatisticsProvider;
+import org.apache.jackrabbit.oak.stats.StatsOptions;
+import org.junit.After;
+import org.junit.Before;
import org.junit.Rule;
import org.junit.Test;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
import java.io.IOException;
import java.net.URL;
@@ -52,16 +62,26 @@ import java.util.Collection;
import java.util.List;
import java.util.Map;
import java.util.UUID;
+import java.util.concurrent.Executors;
+import java.util.concurrent.ScheduledExecutorService;
+import java.util.concurrent.TimeUnit;
import java.util.stream.Collectors;
import java.util.stream.Stream;
import static
org.apache.jackrabbit.oak.plugins.index.elastic.query.inference.InferenceConstants.ENRICHER_CONFIG;
import static
org.apache.jackrabbit.oak.plugins.index.elastic.query.inference.InferenceConstants.TYPE;
import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertFalse;
import static org.junit.Assert.assertNotNull;
+import static org.junit.Assert.assertTrue;
public class ElasticInferenceUsingConfigTest extends ElasticAbstractQueryTest {
+ private static final Logger LOG =
LoggerFactory.getLogger(ElasticInferenceUsingConfigTest.class);
+
+ private ScheduledExecutorService executorService;
+ private StatisticsProvider statisticsProvider;
+
@Rule
public WireMockRule wireMock = new
WireMockRule(WireMockConfiguration.options().dynamicPort());
@@ -69,6 +89,39 @@ public class ElasticInferenceUsingConfigTest extends
ElasticAbstractQueryTest {
private final String defaultEnricherStatusMapping =
"{\"properties\":{\"processingTimeMs\":{\"type\":\"date\"},\"latestError\":{\"type\":\"keyword\",\"index\":false},\"errorCount\":{\"type\":\"short\"},\"status\":{\"type\":\"keyword\"}}}";
private final String defaultEnricherStatusData =
"{\"processingTimeMs\":0,\"latestError\":\"\",\"errorCount\":0,\"status\":\"PENDING\"}";
+ @Before
+ public void setUp() {
+ // Set system property for small metric logging interval
+ System.setProperty("oak.inference.metrics.log.interval", "100");
+
+ // Initialize StatisticsProvider for metrics testing
+ executorService = Executors.newSingleThreadScheduledExecutor(r -> {
+ Thread t = new Thread(r, "Statistics-Test-Thread-" +
UUID.randomUUID());
+ t.setDaemon(true);
+ return t;
+ });
+ statisticsProvider = new DefaultStatisticsProvider(executorService);
+ }
+
+ @After
+ public void tearDownStatistics() {
+ // Clear system property
+ System.clearProperty("oak.inference.metrics.log.interval");
+
+ if (executorService != null) {
+ executorService.shutdown();
+ try {
+ executorService.awaitTermination(1, TimeUnit.SECONDS);
+ } catch (InterruptedException e) {
+ Thread.currentThread().interrupt();
+ LOG.warn("Interrupted while waiting for executor service to
terminate", e);
+ }
+ if (!executorService.isTerminated()) {
+ executorService.shutdownNow();
+ }
+ }
+ }
+
@Test
public void inferenceConfigStoredInIndexMetadata() throws
CommitFailedException, JsonProcessingException {
String indexName = UUID.randomUUID().toString();
@@ -124,11 +177,11 @@ public class ElasticInferenceUsingConfigTest extends
ElasticAbstractQueryTest {
* Helper method to setup an inference model configuration.
*/
private void setupInferenceModelConfig(NodeBuilder inferenceIndexConfig,
- String configName, String modelName,
- String serviceUrl, double threshold,
- long minTerms, boolean isDefault,
boolean isEnabled,
- Map<String, String> headers,
- Map<String, Object> payloadConfig) {
+ String configName, String modelName,
+ String serviceUrl, double threshold,
+ long minTerms, boolean isDefault,
boolean isEnabled,
+ Map<String, String> headers,
+ Map<String, Object> payloadConfig) {
// Add inference model configuration
NodeBuilder modelConfig = inferenceIndexConfig.child(configName);
modelConfig.setProperty(InferenceConstants.TYPE,
InferenceModelConfig.TYPE);
@@ -166,46 +219,112 @@ public class ElasticInferenceUsingConfigTest extends
ElasticAbstractQueryTest {
String inferenceModelConfigName = "ada-test-model";
String inferenceModelName = "text-embedding-ada-002";
- // Create inference config
- createInferenceConfig(jcrIndexName, true, defaultEnricherConfig,
inferenceModelConfigName,
- inferenceModelName, inferenceServiceUrl, 0.8, 1L, true, true);
- setupEnricherStatus(defaultEnricherStatusMapping,
defaultEnricherStatusData);
- // Create index definition with multiple properties
- IndexDefinitionBuilder builder = createIndexDefinition("title",
"description", "updatedBy");
- Tree index = setIndex(jcrIndexName, builder);
- root.commit();
+ // Setup log customizer to capture InferenceServiceMetrics logs
+ LogCustomizer logCustomizer = LogCustomizer
+ .forLogger(InferenceServiceMetrics.class.getName())
+ .enable(Level.INFO)
+ .enable(Level.DEBUG)
+ .contains("Inference service metrics")
+ .create();
+ logCustomizer.starting();
+
+ try {
+ // Create inference config
+ createInferenceConfig(jcrIndexName, true, defaultEnricherConfig,
inferenceModelConfigName,
+ inferenceModelName, inferenceServiceUrl, 0.8, 1L, true, true);
+ setupEnricherStatus(defaultEnricherStatusMapping,
defaultEnricherStatusData);
+ // Create index definition with multiple properties
+ IndexDefinitionBuilder builder = createIndexDefinition("title",
"description", "updatedBy");
+ Tree index = setIndex(jcrIndexName, builder);
+ root.commit();
+
+ // Add test content
+ addTestContent();
+
+ // Let the index catch up
+ assertEventually(() -> assertEquals(7, countDocuments(index)));
+
+ // Enrich documents with embeddings
+ setupEmbeddingsForContent(index, inferenceModelConfigName,
inferenceModelName);
+
+ // Setup wiremock stubs for inference service
+ setupMockInferenceService(inferenceModelConfigName, jcrIndexName);
+
+ // Test query results
+ Map<String, String> queryResults = Map.of(
+ "a beginner guide to data manipulation in python",
"/content/programming",
+ "how to improve mental health through exercises",
"/content/yoga",
+ "nutritional advice for a healthier lifestyle",
"/content/health",
+ "technological advancements in electric vehicles",
"/content/cars",
+ "what are the key algorithms used in machine learning",
"/content/ml"
+ );
+
+ // Verify all queries return expected results
+ assertEventually(() -> {
+ verifyQueryResults(queryResults, inferenceConfigInQuery,
jcrIndexName);
+
+ // Test error handling scenarios
+ verifyErrorHandling(jcrIndexName, inferenceConfigInQuery);
+ });
- // Add test content
- addTestContent();
+ // Test that inference data persists through document updates
+ testInferenceDataPersistenceOnUpdate(index);
- // Let the index catch up
- assertEventually(() -> assertEquals(7, countDocuments(index)));
+ // Create and verify metrics directly with our statisticsProvider
+ InferenceServiceMetrics directMetrics = new
InferenceServiceMetrics(statisticsProvider,
+ "test-metrics",
+ 100);
- // Enrich documents with embeddings
- setupEmbeddingsForContent(index, inferenceModelConfigName,
inferenceModelName);
+ // Set reasonable counter values
+ CounterStats counter =
statisticsProvider.getCounterStats("test-metrics_" +
InferenceServiceMetrics.TOTAL_REQUESTS,
+ StatsOptions.DEFAULT);
+ counter.inc(10);
- // Setup wiremock stubs for inference service
- setupMockInferenceService(inferenceModelConfigName, jcrIndexName);
+ // Log metrics with the counts
+ directMetrics.logMetricsSummary();
- // Test query results
- Map<String, String> queryResults = Map.of(
- "a beginner guide to data manipulation in python",
"/content/programming",
- "how to improve mental health through exercises", "/content/yoga",
- "nutritional advice for a healthier lifestyle", "/content/health",
- "technological advancements in electric vehicles", "/content/cars",
- "what are the key algorithms used in machine learning",
"/content/ml"
- );
+ LOG.info("Successfully logged basic metrics");
- // Verify all queries return expected results
- assertEventually(() -> {
- verifyQueryResults(queryResults, inferenceConfigInQuery,
jcrIndexName);
+ // Verify that we have captured the metrics logs
+ Thread.sleep(500); // Give a small delay for logging to complete
+ verifyMetricsLogsPresent(logCustomizer);
+ } finally {
+ logCustomizer.finished();
+ }
+ }
- // Test error handling scenarios
- verifyErrorHandling(jcrIndexName, inferenceConfigInQuery);
- });
+ /**
+ * Verifies that metrics logs were captured by the LogCustomizer.
+ *
+ * @param logCustomizer The LogCustomizer instance used to capture logs
+ */
+ private void verifyMetricsLogsPresent(LogCustomizer logCustomizer) {
+ List<String> logs = logCustomizer.getLogs();
+ assertFalse("Should have captured metrics logs", logs.isEmpty());
+
+ LOG.info("Captured {} metrics log entries", logs.size());
+
+ // At least one log should contain the metrics information
+ boolean foundMetricsLog = false;
+
+ for (String log : logs) {
+ if (log.contains("Inference service metrics")) {
+ foundMetricsLog = true;
+
+ // Verify it contains some of the expected metrics
+ assertTrue("Log should contain request count",
+ log.contains("requests="));
+ assertTrue("Log should contain cache hit rate",
+ log.contains("hitRate="));
+ assertTrue("Log should contain error rate",
+ log.contains("errorRate="));
+
+ LOG.info("Found metrics log: {}", log);
+ break;
+ }
+ }
- // Test that inference data persists through document updates
- testInferenceDataPersistenceOnUpdate(index);
+ assertTrue("Should have found at least one metrics log entry",
foundMetricsLog);
}
/**
@@ -381,10 +500,10 @@ public class ElasticInferenceUsingConfigTest extends
ElasticAbstractQueryTest {
* Creates inference configuration with the specified parameters.
*/
private void createInferenceConfig(String indexName, boolean
isInferenceConfigEnabled,
- String enricherConfig, String
inferenceModelConfigName,
- String inferenceModelName, String
embeddingServiceUrl,
- Double similarityThreshold, long
minTerms, boolean isDefaultInferenceModelConfig,
- boolean isInferenceModelConfigEnabled)
throws CommitFailedException {
+ String enricherConfig, String
inferenceModelConfigName,
+ String inferenceModelName, String
embeddingServiceUrl,
+ Double similarityThreshold, long
minTerms, boolean isDefaultInferenceModelConfig,
+ boolean isInferenceModelConfigEnabled)
throws CommitFailedException {
NodeBuilder rootBuilder = nodeStore.getRoot().builder();
NodeBuilder nodeBuilder = rootBuilder;
for (String path : PathUtils.elements(INFERENCE_CONFIG_PATH)) {
@@ -569,13 +688,16 @@ public class ElasticInferenceUsingConfigTest extends
ElasticAbstractQueryTest {
Tree content = root.getTree("/").addChild("content");
Tree document = content.addChild("document");
document.setProperty("title", "Test Document for Reinitialization");
+ Tree document2 = content.addChild("document2");
+ document2.setProperty("title", "Test Document for Reinitialization 2");
root.commit();
// Let the index catch up
- assertEventually(() -> assertEquals(2, countDocuments(index)));
+ assertEventually(() -> assertEquals(3, countDocuments(index)));
// Verify the enricher status in the indexed document
verifyEnricherStatus(index, "/content/document", updatedStatusData);
+ verifyEnricherStatus(index, "/content/document2", updatedStatusData);
}
/**
@@ -626,7 +748,7 @@ public class ElasticInferenceUsingConfigTest extends
ElasticAbstractQueryTest {
* Creates a document with vector embeddings.
*/
private void createDocumentWithEmbeddings(Tree index, String path, String
inferenceModelConfigName,
- String inferenceModelName,
List<Float> embeddings) throws IOException {
+ String inferenceModelName,
List<Float> embeddings) throws IOException {
ObjectMapper mapper = new JsonMapper();
ObjectNode updateDoc = mapper.createObjectNode();
VectorDocument vectorDocument = new
VectorDocument(UUID.randomUUID().toString(), embeddings,
@@ -637,4 +759,20 @@ public class ElasticInferenceUsingConfigTest extends
ElasticAbstractQueryTest {
updateDocument(index, path, updateDoc);
}
+
+ /**
+ * Test metrics class that uses unique metric names to avoid conflicts
+ */
+ private static class TestMetricsWithUniqueNames extends
InferenceServiceMetrics {
+ private final String uniquePrefix = "test_" +
UUID.randomUUID().toString().replace("-", "_");
+
+ public TestMetricsWithUniqueNames(StatisticsProvider
statisticsProvider) {
+ super(statisticsProvider, "test-unique-metrics", 100);
+ }
+
+ @Override
+ protected String getMetricName(String baseName) {
+ return uniquePrefix + "_" + baseName;
+ }
+ }
}
diff --git
a/oak-search-elastic/src/test/java/org/apache/jackrabbit/oak/plugins/index/elastic/query/inference/InferenceServiceMetricsTest.java
b/oak-search-elastic/src/test/java/org/apache/jackrabbit/oak/plugins/index/elastic/query/inference/InferenceServiceMetricsTest.java
new file mode 100644
index 0000000000..9cb0d46449
--- /dev/null
+++
b/oak-search-elastic/src/test/java/org/apache/jackrabbit/oak/plugins/index/elastic/query/inference/InferenceServiceMetricsTest.java
@@ -0,0 +1,256 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.jackrabbit.oak.plugins.index.elastic.query.inference;
+
+import ch.qos.logback.classic.Level;
+import org.apache.jackrabbit.oak.commons.junit.LogCustomizer;
+import org.apache.jackrabbit.oak.stats.CounterStats;
+import org.apache.jackrabbit.oak.stats.DefaultStatisticsProvider;
+import org.apache.jackrabbit.oak.stats.MeterStats;
+import org.apache.jackrabbit.oak.stats.StatisticsProvider;
+import org.apache.jackrabbit.oak.stats.StatsOptions;
+import org.apache.jackrabbit.oak.stats.TimerStats;
+import org.junit.After;
+import org.junit.Before;
+import org.junit.Test;
+
+import java.util.List;
+import java.util.UUID;
+import java.util.concurrent.Executors;
+import java.util.concurrent.ScheduledExecutorService;
+import java.util.concurrent.TimeUnit;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.assertTrue;
+
+/**
+ * Tests for InferenceServiceMetrics.
+ */
+public class InferenceServiceMetricsTest {
+
+ private static final String TEST_SERVICE_KEY = "testService";
+ private static final int TEST_CACHE_SIZE = 100;
+ private LogCustomizer logCustomizer;
+ private ScheduledExecutorService executorService;
+ private StatisticsProvider statisticsProvider;
+
+ @Before
+ public void setUp() {
+ logCustomizer = LogCustomizer
+ .forLogger(InferenceServiceMetrics.class.getName())
+ .enable(Level.INFO)
+ .enable(Level.DEBUG)
+ .create();
+ logCustomizer.starting();
+
+ executorService = Executors.newSingleThreadScheduledExecutor(r -> {
+ Thread t = new Thread(r, "Statistics-Test-Thread-" +
UUID.randomUUID());
+ t.setDaemon(true);
+ return t;
+ });
+ statisticsProvider = new DefaultStatisticsProvider(executorService);
+ }
+
+ @After
+ public void tearDown() throws Exception {
+ if (logCustomizer != null) {
+ logCustomizer.finished();
+ }
+
+ if (executorService != null) {
+ executorService.shutdown();
+ executorService.awaitTermination(1, TimeUnit.SECONDS);
+ if (!executorService.isTerminated()) {
+ executorService.shutdownNow();
+ }
+ }
+ }
+
+ @Test
+ public void testRequestMeters() {
+ TestInferenceServiceMetrics metrics = new
TestInferenceServiceMetrics("request");
+
+ // Verify initial state
+ assertEquals(0L,
metrics.getDirectMeter(InferenceServiceMetrics.TOTAL_REQUESTS).getCount());
+ assertEquals(0L,
metrics.getDirectMeter(InferenceServiceMetrics.INFERENCE_REQUEST_ERRORS).getCount());
+
+ // Start a request and verify the total requests meter is marked
+ TimerStats.Context context = metrics.requestStarted();
+ assertEquals(1L,
metrics.getDirectMeter(InferenceServiceMetrics.TOTAL_REQUESTS).getCount());
+
+ // Complete the request and verify the meter count remains the same
+ // No additional meters should be marked for completion
+ metrics.requestCompleted(100, context);
+ assertEquals(1L,
metrics.getDirectMeter(InferenceServiceMetrics.TOTAL_REQUESTS).getCount());
+ assertEquals(0L,
metrics.getDirectMeter(InferenceServiceMetrics.INFERENCE_REQUEST_ERRORS).getCount());
+
+ // Start another request, but this time record an error
+ context = metrics.requestStarted();
+ metrics.requestError(50, context);
+
+ // Verify both total requests and error meters are incremented
+ assertEquals(2L,
metrics.getDirectMeter(InferenceServiceMetrics.TOTAL_REQUESTS).getCount());
+ assertEquals(1L,
metrics.getDirectMeter(InferenceServiceMetrics.INFERENCE_REQUEST_ERRORS).getCount());
+
+ // Test error without context
+ metrics.requestError();
+ assertEquals(2L,
metrics.getDirectMeter(InferenceServiceMetrics.INFERENCE_REQUEST_ERRORS).getCount());
+ }
+
+ @Test
+ public void testCacheMeters() {
+ TestInferenceServiceMetrics metrics = new
TestInferenceServiceMetrics("cache");
+
+ // Verify initial state
+ assertEquals(0L,
metrics.getDirectMeter(InferenceServiceMetrics.CACHE_HITS).getCount());
+ assertEquals(0L,
metrics.getDirectMeter(InferenceServiceMetrics.CACHE_MISSES).getCount());
+ assertEquals(0.0, metrics.getCacheHitRate(), 0.01);
+
+ // Record cache hits and misses
+ for (int i = 0; i < 7; i++) metrics.cacheHit();
+ for (int i = 0; i < 3; i++) metrics.cacheMiss();
+
+ // Verify meters
+ assertEquals(7L,
metrics.getDirectMeter(InferenceServiceMetrics.CACHE_HITS).getCount());
+ assertEquals(3L,
metrics.getDirectMeter(InferenceServiceMetrics.CACHE_MISSES).getCount());
+ assertEquals(70.0, metrics.getCacheHitRate(), 0.01);
+
+ // Add more hits and verify meters are incremented correctly
+ for (int i = 0; i < 3; i++) metrics.cacheHit();
+ assertEquals(10L,
metrics.getDirectMeter(InferenceServiceMetrics.CACHE_HITS).getCount());
+ assertEquals(3L,
metrics.getDirectMeter(InferenceServiceMetrics.CACHE_MISSES).getCount());
+ assertEquals(10L / 13.0 * 100.0, metrics.getCacheHitRate(), 0.01);
+ }
+
+ @Test
+ public void testMultipleRequestsAndErrors() {
+ TestInferenceServiceMetrics metrics = new
TestInferenceServiceMetrics("multi");
+
+ // Record successful requests
+ for (int i = 0; i < 5; i++) {
+ TimerStats.Context context = metrics.requestStarted();
+ metrics.requestCompleted(100, context);
+ }
+
+ // Record error requests
+ for (int i = 0; i < 3; i++) {
+ TimerStats.Context context = metrics.requestStarted();
+ metrics.requestError(200, context);
+ }
+
+ // Verify meters
+ assertEquals(8L,
metrics.getDirectMeter(InferenceServiceMetrics.TOTAL_REQUESTS).getCount());
+ assertEquals(3L,
metrics.getDirectMeter(InferenceServiceMetrics.INFERENCE_REQUEST_ERRORS).getCount());
+
+ // Verify error rate through getMetrics() method
+ double errorRate = (Double)
metrics.getMetrics().get(InferenceServiceMetrics.INFERENCE_ERROR_RATE);
+ assertEquals(3.0 / 8.0 * 100.0, errorRate, 0.01);
+ }
+
+ @Test
+ public void testMetricsMap() {
+ TestInferenceServiceMetrics metrics = new
TestInferenceServiceMetrics("map");
+
+ // Record some data
+ for (int i = 0; i < 8; i++) metrics.cacheHit();
+ for (int i = 0; i < 2; i++) metrics.cacheMiss();
+
+ TimerStats.Context context = metrics.requestStarted();
+ metrics.requestCompleted(100, context);
+
+ context = metrics.requestStarted();
+ metrics.requestError(50, context);
+
+ // Get metrics map and verify its content
+ var metricsMap = metrics.getMetrics();
+
+ assertEquals(2L, metricsMap.get("totalRequests"));
+ assertEquals(8L, metricsMap.get("cacheHits"));
+ assertEquals(2L, metricsMap.get("cacheMisses"));
+ assertEquals(1L, metricsMap.get("requestErrors"));
+
+ // There's a difference in how hit rate is calculated in getMetrics()
+ // In the implementation, it uses total hits/(total requests) if total
> 0,
+ // not hits/(hits+misses) as in getCacheHitRate()
+ double hitRateFromMap = (Double)
metricsMap.get(InferenceServiceMetrics.INFERENCE_CACHE_HIT_RATE);
+ // Verify that we're using the correct calculation method
+ long hits =
metrics.getDirectMeter(InferenceServiceMetrics.CACHE_HITS).getCount();
+ long totalReq =
metrics.getDirectMeter(InferenceServiceMetrics.TOTAL_REQUESTS).getCount();
+ double calculatedRate = totalReq > 0 ? (hits * 100.0 / 2) : 0.0;
+ assertEquals(calculatedRate, hitRateFromMap, 0.01);
+
+ assertEquals(TEST_CACHE_SIZE,
metricsMap.get(InferenceServiceMetrics.INFERENCE_CACHE_SIZE));
+ assertEquals(50.0, (Double)
metricsMap.get(InferenceServiceMetrics.INFERENCE_ERROR_RATE), 0.01);
+ }
+
+ @Test
+ public void testMetricsLogging() {
+ TestInferenceServiceMetrics metrics = new
TestInferenceServiceMetrics("logging");
+
+ // Generate metrics
+ for (int i = 0; i < 5; i++) {
+ TimerStats.Context context = metrics.requestStarted();
+ metrics.requestCompleted(100, context);
+ }
+ for (int i = 0; i < 7; i++) metrics.cacheHit();
+ for (int i = 0; i < 3; i++) metrics.cacheMiss();
+
+ // Log metrics
+ metrics.logMetricsSummary();
+
+ // Verify logging
+ List<String> logs = logCustomizer.getLogs();
+ assertFalse("Should have logged metrics", logs.isEmpty());
+
+ String logMessage = logs.get(logs.size() - 1);
+ assertTrue("Log should contain service metrics",
logMessage.contains("Inference service metrics"));
+ // Validation against the actual output format of
InferenceServiceMetrics
+ assertTrue("Log should contain request count",
logMessage.contains("requests="));
+ assertTrue("Log should contain hit rate",
logMessage.contains("hitRate="));
+ assertTrue("Log should contain error rate",
logMessage.contains("errorRate="));
+ }
+
+ private class TestInferenceServiceMetrics extends InferenceServiceMetrics {
+ private final String testPrefix;
+
+ public TestInferenceServiceMetrics(String testPrefix) {
+ super(statisticsProvider, TEST_SERVICE_KEY, TEST_CACHE_SIZE);
+ this.testPrefix = testPrefix;
+ }
+
+ @Override
+ protected String getMetricName(String baseName) {
+ // This method is called with metricsServiceKey + ";" + name, so
we need to preserve that format
+ // but still add our test prefix for uniqueness
+ return testPrefix + "_" + baseName;
+ }
+
+ // Methods to directly access the stats for verification
+ public CounterStats getDirectCounter(String name) {
+ // Format the name the same way it's done in the parent class
+ return
statisticsProvider.getCounterStats(getMetricName(TEST_SERVICE_KEY + ";" +
name), StatsOptions.DEFAULT);
+ }
+
+ public MeterStats getDirectMeter(String name) {
+ // Format the name the same way it's done in the parent getMeter()
method
+ return statisticsProvider.getMeter(getMetricName(TEST_SERVICE_KEY
+ ";" + name), StatsOptions.DEFAULT);
+ }
+ }
+}
\ No newline at end of file