This is an automated email from the ASF dual-hosted git repository.
nfsantos pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/jackrabbit-oak.git
The following commit(s) were added to refs/heads/trunk by this push:
new 854d6823e2 OAK-11851 - Log performance metrics of statistical facets
and simplify logic (#2442)
854d6823e2 is described below
commit 854d6823e2ca85f8bbbca54cff5504aa98a5c0a4
Author: Nuno Santos <[email protected]>
AuthorDate: Thu Aug 14 15:22:58 2025 +0200
OAK-11851 - Log performance metrics of statistical facets and simplify
logic (#2442)
---
.../ElasticStatisticalFacetAsyncProvider.java | 178 ++++++++++++++-------
1 file changed, 116 insertions(+), 62 deletions(-)
diff --git
a/oak-search-elastic/src/main/java/org/apache/jackrabbit/oak/plugins/index/elastic/query/async/facets/ElasticStatisticalFacetAsyncProvider.java
b/oak-search-elastic/src/main/java/org/apache/jackrabbit/oak/plugins/index/elastic/query/async/facets/ElasticStatisticalFacetAsyncProvider.java
index bf7b636455..95d8a27ee1 100644
---
a/oak-search-elastic/src/main/java/org/apache/jackrabbit/oak/plugins/index/elastic/query/async/facets/ElasticStatisticalFacetAsyncProvider.java
+++
b/oak-search-elastic/src/main/java/org/apache/jackrabbit/oak/plugins/index/elastic/query/async/facets/ElasticStatisticalFacetAsyncProvider.java
@@ -44,9 +44,10 @@ import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.CompletableFuture;
-import java.util.concurrent.ConcurrentHashMap;
-import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.ExecutionException;
import java.util.concurrent.TimeUnit;
+import java.util.concurrent.TimeoutException;
+import java.util.concurrent.atomic.LongAdder;
import java.util.function.Predicate;
import java.util.stream.Collectors;
@@ -63,14 +64,23 @@ public class ElasticStatisticalFacetAsyncProvider
implements ElasticFacetProvide
private final Predicate<String> isAccessible;
private final Set<String> facetFields;
private final long facetsEvaluationTimeoutMs;
- private final Map<String, List<FulltextIndex.Facet>> allFacets = new
HashMap<>();
- private final Map<String, Map<String, MutableInt>> accessibleFacetCounts =
new ConcurrentHashMap<>();
private Map<String, List<FulltextIndex.Facet>> facets;
private final SearchRequest searchRequest;
- private final CountDownLatch latch = new CountDownLatch(1);
+ private final CompletableFuture<Map<String, List<FulltextIndex.Facet>>>
searchFuture;
private int sampled;
private long totalHits;
+ private final long queryStartTimeNanos;
+ // All these variables are updated only by the event handler thread of
Elastic. They are read either by that
+ // same thread or by the client thread that waits for the latch to
complete. Since the latch causes a memory barrier,
+ // the updated values will be visible to the client thread.
+ private long queryTimeNanos;
+ private long processAggregationsTimeNanos;
+ // It is written by multiple threads, so we use LongAdder for better
performance than AtomicLong
+ private final LongAdder aclTestTimeNanos = new LongAdder();
+ private long processHitsTimeNanos;
+ private long computeStatisticalFacetsTimeNanos;
+
ElasticStatisticalFacetAsyncProvider(ElasticConnection connection,
ElasticIndexDefinition indexDefinition,
ElasticRequestHandler
elasticRequestHandler, ElasticResponseHandler elasticResponseHandler,
Predicate<String> isAccessible, int
sampleSize, long facetsEvaluationTimeoutMs) {
@@ -96,87 +106,120 @@ public class ElasticStatisticalFacetAsyncProvider
implements ElasticFacetProvide
)
);
+ this.queryStartTimeNanos = System.nanoTime();
LOG.trace("Kicking search query with random sampling {}",
searchRequest);
- CompletableFuture<SearchResponse<ObjectNode>> searchFuture =
- connection.getAsyncClient().search(searchRequest,
ObjectNode.class);
-
- searchFuture.whenCompleteAsync((searchResponse, throwable) -> {
- try {
- if (throwable != null) {
- LOG.error("Error while retrieving sample documents. Search
request: {}", searchRequest, throwable);
- } else {
- List<Hit<ObjectNode>> searchHits =
searchResponse.hits().hits();
- this.sampled = searchHits != null ? searchHits.size() : 0;
- if (sampled > 0) {
- this.totalHits = searchResponse.hits().total().value();
- processAggregations(searchResponse.aggregations());
- searchResponse.hits().hits().forEach(this::processHit);
- computeStatisticalFacets();
- }
- }
- } finally {
- latch.countDown();
- }
- });
+ this.searchFuture = connection.getAsyncClient()
+ .search(searchRequest, ObjectNode.class)
+ .thenApplyAsync(this::computeFacets);
}
@Override
public List<FulltextIndex.Facet> getFacets(int numberOfFacets, String
columnName) {
- LOG.trace("Requested facets for {} - Latch count: {}", columnName,
latch.getCount());
- try {
- boolean completed = latch.await(facetsEvaluationTimeoutMs,
TimeUnit.MILLISECONDS);
- if (!completed) {
- LOG.error("Timed out while waiting for facets. Search request:
{}", searchRequest);
- throw new IllegalStateException("Timed out while waiting for
facets");
+ // TODO: In case of failure, we log an exception and return null. This
is likely not the ideal behavior, as the
+ // caller has no way to distinguish between a failure and empty
results. But in this PR I'm leaving this
+ // behavior as is to not introduce further changes. We should revise
this behavior once the queries for facets
+ // are decoupled from the query for results, as this will make it
easier to better handle errors
+ if (!searchFuture.isDone()) {
+ try {
+ LOG.trace("Requested facets for {}. Waiting up to: {}",
columnName, facetsEvaluationTimeoutMs);
+ long start = System.nanoTime();
+ facets = searchFuture.get(facetsEvaluationTimeoutMs,
TimeUnit.MILLISECONDS);
+ LOG.trace("Facets computed in {}.",
TimeUnit.NANOSECONDS.toMillis(System.nanoTime() - start));
+ } catch (ExecutionException e) {
+ LOG.error("Error evaluating facets", e);
+ } catch (TimeoutException e) {
+ searchFuture.cancel(true);
+ LOG.error("Timed out while waiting for facets. Search request:
{}. {}", searchRequest, timingsToString());
+ } catch (InterruptedException e) {
+ Thread.currentThread().interrupt(); // restore interrupt
status
+ throw new IllegalStateException("Error while waiting for
facets", e);
}
- } catch (InterruptedException e) {
- Thread.currentThread().interrupt(); // restore interrupt status
- throw new IllegalStateException("Error while waiting for facets",
e);
}
LOG.trace("Reading facets for {} from {}", columnName, facets);
String field =
ElasticIndexUtils.fieldName(FulltextIndex.parseFacetField(columnName));
return facets != null ? facets.get(field) : null;
}
- private void processHit(Hit<ObjectNode> searchHit) {
- final String path = elasticResponseHandler.getPath(searchHit);
- if (path != null && isAccessible.test(path)) {
- for (String field : facetFields) {
- JsonNode value = searchHit.source().get(field);
- if (value != null) {
- accessibleFacetCounts.compute(field, (column, facetValues)
-> {
- if (facetValues == null) {
- Map<String, MutableInt> values = new HashMap<>();
- values.put(value.asText(), new MutableInt(1));
- return values;
- } else {
- facetValues.compute(value.asText(), (k, v) -> {
- if (v == null) {
- return new MutableInt(1);
- } else {
- v.increment();
- return v;
- }
- });
- return facetValues;
- }
- });
- }
+ private Map<String, List<FulltextIndex.Facet>>
computeFacets(SearchResponse<ObjectNode> searchResponse) {
+ LOG.trace("SearchResponse: {}", searchResponse);
+ this.queryTimeNanos = System.nanoTime() - queryStartTimeNanos;
+ List<Hit<ObjectNode>> searchHits = searchResponse.hits().hits();
+ this.sampled = searchHits != null ? searchHits.size() : 0;
+ if (sampled > 0) {
+ this.totalHits = searchResponse.hits().total().value();
+ Map<String, List<FulltextIndex.Facet>> allFacets =
processAggregations(searchResponse.aggregations());
+ Map<String, Map<String, MutableInt>> accessibleFacetCounts = new
HashMap<>();
+ searchResponse.hits().hits().stream()
+ // Possible candidate for parallelization using parallel
streams
+ .filter(this::isAccessible)
+ .forEach(hit -> processFilteredHit(hit,
accessibleFacetCounts));
+ Map<String, List<FulltextIndex.Facet>> facets =
computeStatisticalFacets(allFacets, accessibleFacetCounts);
+ if (LOG.isDebugEnabled()) {
+ LOG.debug(timingsToString());
+ }
+ return facets;
+ } else {
+ return Map.of();
+ }
+ }
+
+ private void processFilteredHit(Hit<ObjectNode> searchHit, Map<String,
Map<String, MutableInt>> accessibleFacetCounts) {
+ long start = System.nanoTime();
+ ObjectNode source = searchHit.source();
+ for (String field : facetFields) {
+ JsonNode value = source.get(field);
+ if (value != null) {
+ accessibleFacetCounts.compute(field, (column, facetValues) -> {
+ if (facetValues == null) {
+ Map<String, MutableInt> values = new HashMap<>();
+ values.put(value.asText(), new MutableInt(1));
+ return values;
+ } else {
+ facetValues.compute(value.asText(), (k, v) -> {
+ if (v == null) {
+ return new MutableInt(1);
+ } else {
+ v.increment();
+ return v;
+ }
+ });
+ return facetValues;
+ }
+ });
}
}
+ this.processHitsTimeNanos += System.nanoTime() - start;
}
- private void processAggregations(Map<String, Aggregate> aggregations) {
+ private boolean isAccessible(Hit<ObjectNode> searchHit) {
+ long start = System.nanoTime();
+ String path = elasticResponseHandler.getPath(searchHit);
+ boolean result = path != null && isAccessible.test(path);
+ long durationNanos = System.nanoTime() - start;
+ long durationMillis = TimeUnit.NANOSECONDS.toMillis(durationNanos);
+ if (durationMillis > 10) {
+ LOG.debug("Slow path checking ACLs: {}, {} ms", path,
durationMillis);
+ }
+ aclTestTimeNanos.add(durationNanos);
+ return result;
+ }
+
+ private Map<String, List<FulltextIndex.Facet>>
processAggregations(Map<String, Aggregate> aggregations) {
+ long start = System.nanoTime();
+ Map<String, List<FulltextIndex.Facet>> allFacets = new HashMap<>();
for (String field : facetFields) {
List<StringTermsBucket> buckets =
aggregations.get(field).sterms().buckets().array();
allFacets.put(field, buckets.stream()
.map(b -> new FulltextIndex.Facet(b.key().stringValue(),
(int) b.docCount()))
- .collect(Collectors.toList())
+ .collect(Collectors.toUnmodifiableList())
);
}
+ this.processAggregationsTimeNanos = System.nanoTime() - start;
+ return allFacets;
}
- private void computeStatisticalFacets() {
+ private Map<String, List<FulltextIndex.Facet>>
computeStatisticalFacets(Map<String, List<FulltextIndex.Facet>> allFacets,
Map<String, Map<String, MutableInt>> accessibleFacetCounts) {
+ long start = System.nanoTime();
for (String facetKey : allFacets.keySet()) {
if (accessibleFacetCounts.containsKey(facetKey)) {
Map<String, MutableInt> accessibleFacet =
accessibleFacetCounts.get(facetKey);
@@ -205,7 +248,18 @@ public class ElasticStatisticalFacetAsyncProvider
implements ElasticFacetProvide
.collect(Collectors.toList())
)
);
+ this.computeStatisticalFacetsTimeNanos = System.nanoTime() - start;
LOG.trace("Statistical facets {}", facets);
+ return facets;
}
+ private String timingsToString() {
+ return String.format("Facet computation times: {query: %d ms,
processAggregations: %d ms, filterByAcl: %d ms, processHits: %d ms,
computeStatisticalFacets: %d ms}. Total hits: %d, samples: %d",
+ queryTimeNanos > 0 ?
TimeUnit.NANOSECONDS.toMillis(queryTimeNanos) : -1,
+ processAggregationsTimeNanos > 0 ?
TimeUnit.NANOSECONDS.toMillis(processAggregationsTimeNanos) : -1,
+ aclTestTimeNanos.sum() > 0 ?
TimeUnit.NANOSECONDS.toMillis(aclTestTimeNanos.sum()) : -1,
+ processHitsTimeNanos > 0 ?
TimeUnit.NANOSECONDS.toMillis(processHitsTimeNanos) : -1,
+ computeStatisticalFacetsTimeNanos > 0 ?
TimeUnit.NANOSECONDS.toMillis(computeStatisticalFacetsTimeNanos) : -1,
+ totalHits, sampled);
+ }
}