This is an automated email from the ASF dual-hosted git repository.

nfsantos pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/jackrabbit-oak.git


The following commit(s) were added to refs/heads/trunk by this push:
     new 854d6823e2 OAK-11851 - Log performance metrics of statistical facets 
and simplify logic (#2442)
854d6823e2 is described below

commit 854d6823e2ca85f8bbbca54cff5504aa98a5c0a4
Author: Nuno Santos <[email protected]>
AuthorDate: Thu Aug 14 15:22:58 2025 +0200

    OAK-11851 - Log performance metrics of statistical facets and simplify 
logic (#2442)
---
 .../ElasticStatisticalFacetAsyncProvider.java      | 178 ++++++++++++++-------
 1 file changed, 116 insertions(+), 62 deletions(-)

diff --git 
a/oak-search-elastic/src/main/java/org/apache/jackrabbit/oak/plugins/index/elastic/query/async/facets/ElasticStatisticalFacetAsyncProvider.java
 
b/oak-search-elastic/src/main/java/org/apache/jackrabbit/oak/plugins/index/elastic/query/async/facets/ElasticStatisticalFacetAsyncProvider.java
index bf7b636455..95d8a27ee1 100644
--- 
a/oak-search-elastic/src/main/java/org/apache/jackrabbit/oak/plugins/index/elastic/query/async/facets/ElasticStatisticalFacetAsyncProvider.java
+++ 
b/oak-search-elastic/src/main/java/org/apache/jackrabbit/oak/plugins/index/elastic/query/async/facets/ElasticStatisticalFacetAsyncProvider.java
@@ -44,9 +44,10 @@ import java.util.List;
 import java.util.Map;
 import java.util.Set;
 import java.util.concurrent.CompletableFuture;
-import java.util.concurrent.ConcurrentHashMap;
-import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.ExecutionException;
 import java.util.concurrent.TimeUnit;
+import java.util.concurrent.TimeoutException;
+import java.util.concurrent.atomic.LongAdder;
 import java.util.function.Predicate;
 import java.util.stream.Collectors;
 
@@ -63,14 +64,23 @@ public class ElasticStatisticalFacetAsyncProvider 
implements ElasticFacetProvide
     private final Predicate<String> isAccessible;
     private final Set<String> facetFields;
     private final long facetsEvaluationTimeoutMs;
-    private final Map<String, List<FulltextIndex.Facet>> allFacets = new 
HashMap<>();
-    private final Map<String, Map<String, MutableInt>> accessibleFacetCounts = 
new ConcurrentHashMap<>();
     private Map<String, List<FulltextIndex.Facet>> facets;
     private final SearchRequest searchRequest;
-    private final CountDownLatch latch = new CountDownLatch(1);
+    private final CompletableFuture<Map<String, List<FulltextIndex.Facet>>> 
searchFuture;
     private int sampled;
     private long totalHits;
 
+    private final long queryStartTimeNanos;
+    // All these variables are updated only by the event handler thread of 
Elastic. They are read either by that
+    // same thread or by the client thread that waits for the latch to 
complete. Since the latch causes a memory barrier,
+    // the updated values will be visible to the client thread.
+    private long queryTimeNanos;
+    private long processAggregationsTimeNanos;
+    // It is written by multiple threads, so we use LongAdder for better 
performance than AtomicLong
+    private final LongAdder aclTestTimeNanos = new LongAdder();
+    private long processHitsTimeNanos;
+    private long computeStatisticalFacetsTimeNanos;
+
     ElasticStatisticalFacetAsyncProvider(ElasticConnection connection, 
ElasticIndexDefinition indexDefinition,
                                          ElasticRequestHandler 
elasticRequestHandler, ElasticResponseHandler elasticResponseHandler,
                                          Predicate<String> isAccessible, int 
sampleSize, long facetsEvaluationTimeoutMs) {
@@ -96,87 +106,120 @@ public class ElasticStatisticalFacetAsyncProvider 
implements ElasticFacetProvide
                 )
         );
 
+        this.queryStartTimeNanos = System.nanoTime();
         LOG.trace("Kicking search query with random sampling {}", 
searchRequest);
-        CompletableFuture<SearchResponse<ObjectNode>> searchFuture =
-                connection.getAsyncClient().search(searchRequest, 
ObjectNode.class);
-
-        searchFuture.whenCompleteAsync((searchResponse, throwable) -> {
-            try {
-                if (throwable != null) {
-                    LOG.error("Error while retrieving sample documents. Search 
request: {}", searchRequest, throwable);
-                } else {
-                    List<Hit<ObjectNode>> searchHits = 
searchResponse.hits().hits();
-                    this.sampled = searchHits != null ? searchHits.size() : 0;
-                    if (sampled > 0) {
-                        this.totalHits = searchResponse.hits().total().value();
-                        processAggregations(searchResponse.aggregations());
-                        searchResponse.hits().hits().forEach(this::processHit);
-                        computeStatisticalFacets();
-                    }
-                }
-            } finally {
-                latch.countDown();
-            }
-        });
+        this.searchFuture = connection.getAsyncClient()
+                .search(searchRequest, ObjectNode.class)
+                .thenApplyAsync(this::computeFacets);
     }
 
     @Override
     public List<FulltextIndex.Facet> getFacets(int numberOfFacets, String 
columnName) {
-        LOG.trace("Requested facets for {} - Latch count: {}", columnName, 
latch.getCount());
-        try {
-            boolean completed = latch.await(facetsEvaluationTimeoutMs, 
TimeUnit.MILLISECONDS);
-            if (!completed) {
-                LOG.error("Timed out while waiting for facets. Search request: 
{}", searchRequest);
-                throw new IllegalStateException("Timed out while waiting for 
facets");
+        // TODO: In case of failure, we log an exception and return null. This 
is likely not the ideal behavior, as the
+        //   caller has no way to distinguish between a failure and empty 
results. But in this PR I'm leaving this
+        //   behavior as is to not introduce further changes. We should revise 
this behavior once the queries for facets
+        //   are decoupled from the query for results, as this will make it 
easier to better handle errors
+        if (!searchFuture.isDone()) {
+            try {
+                LOG.trace("Requested facets for {}. Waiting up to: {}", 
columnName, facetsEvaluationTimeoutMs);
+                long start = System.nanoTime();
+                facets = searchFuture.get(facetsEvaluationTimeoutMs, 
TimeUnit.MILLISECONDS);
+                LOG.trace("Facets computed in {}.", 
TimeUnit.NANOSECONDS.toMillis(System.nanoTime() - start));
+            } catch (ExecutionException e) {
+                LOG.error("Error evaluating facets", e);
+            } catch (TimeoutException e) {
+                searchFuture.cancel(true);
+                LOG.error("Timed out while waiting for facets. Search request: 
{}. {}", searchRequest, timingsToString());
+            } catch (InterruptedException e) {
+                Thread.currentThread().interrupt();  // restore interrupt 
status
+                throw new IllegalStateException("Error while waiting for 
facets", e);
             }
-        } catch (InterruptedException e) {
-            Thread.currentThread().interrupt();  // restore interrupt status
-            throw new IllegalStateException("Error while waiting for facets", 
e);
         }
         LOG.trace("Reading facets for {} from {}", columnName, facets);
         String field = 
ElasticIndexUtils.fieldName(FulltextIndex.parseFacetField(columnName));
         return facets != null ? facets.get(field) : null;
     }
 
-    private void processHit(Hit<ObjectNode> searchHit) {
-        final String path = elasticResponseHandler.getPath(searchHit);
-        if (path != null && isAccessible.test(path)) {
-            for (String field : facetFields) {
-                JsonNode value = searchHit.source().get(field);
-                if (value != null) {
-                    accessibleFacetCounts.compute(field, (column, facetValues) 
-> {
-                        if (facetValues == null) {
-                            Map<String, MutableInt> values = new HashMap<>();
-                            values.put(value.asText(), new MutableInt(1));
-                            return values;
-                        } else {
-                            facetValues.compute(value.asText(), (k, v) -> {
-                                if (v == null) {
-                                    return new MutableInt(1);
-                                } else {
-                                    v.increment();
-                                    return v;
-                                }
-                            });
-                            return facetValues;
-                        }
-                    });
-                }
+    private Map<String, List<FulltextIndex.Facet>> 
computeFacets(SearchResponse<ObjectNode> searchResponse) {
+        LOG.trace("SearchResponse: {}", searchResponse);
+        this.queryTimeNanos = System.nanoTime() - queryStartTimeNanos;
+        List<Hit<ObjectNode>> searchHits = searchResponse.hits().hits();
+        this.sampled = searchHits != null ? searchHits.size() : 0;
+        if (sampled > 0) {
+            this.totalHits = searchResponse.hits().total().value();
+            Map<String, List<FulltextIndex.Facet>> allFacets = 
processAggregations(searchResponse.aggregations());
+            Map<String, Map<String, MutableInt>> accessibleFacetCounts = new 
HashMap<>();
+            searchResponse.hits().hits().stream()
+                    // Possible candidate for parallelization using parallel 
streams
+                    .filter(this::isAccessible)
+                    .forEach(hit -> processFilteredHit(hit, 
accessibleFacetCounts));
+            Map<String, List<FulltextIndex.Facet>> facets = 
computeStatisticalFacets(allFacets, accessibleFacetCounts);
+            if (LOG.isDebugEnabled()) {
+                LOG.debug(timingsToString());
+            }
+            return facets;
+        } else {
+            return Map.of();
+        }
+    }
+
+    private void processFilteredHit(Hit<ObjectNode> searchHit, Map<String, 
Map<String, MutableInt>> accessibleFacetCounts) {
+        long start = System.nanoTime();
+        ObjectNode source = searchHit.source();
+        for (String field : facetFields) {
+            JsonNode value = source.get(field);
+            if (value != null) {
+                accessibleFacetCounts.compute(field, (column, facetValues) -> {
+                    if (facetValues == null) {
+                        Map<String, MutableInt> values = new HashMap<>();
+                        values.put(value.asText(), new MutableInt(1));
+                        return values;
+                    } else {
+                        facetValues.compute(value.asText(), (k, v) -> {
+                            if (v == null) {
+                                return new MutableInt(1);
+                            } else {
+                                v.increment();
+                                return v;
+                            }
+                        });
+                        return facetValues;
+                    }
+                });
             }
         }
+        this.processHitsTimeNanos += System.nanoTime() - start;
     }
 
-    private void processAggregations(Map<String, Aggregate> aggregations) {
+    private boolean isAccessible(Hit<ObjectNode> searchHit) {
+        long start = System.nanoTime();
+        String path = elasticResponseHandler.getPath(searchHit);
+        boolean result = path != null && isAccessible.test(path);
+        long durationNanos = System.nanoTime() - start;
+        long durationMillis = TimeUnit.NANOSECONDS.toMillis(durationNanos);
+        if (durationMillis > 10) {
+            LOG.debug("Slow path checking ACLs: {}, {} ms", path, 
durationMillis);
+        }
+        aclTestTimeNanos.add(durationNanos);
+        return result;
+    }
+
+    private Map<String, List<FulltextIndex.Facet>> 
processAggregations(Map<String, Aggregate> aggregations) {
+        long start = System.nanoTime();
+        Map<String, List<FulltextIndex.Facet>> allFacets = new HashMap<>();
         for (String field : facetFields) {
             List<StringTermsBucket> buckets = 
aggregations.get(field).sterms().buckets().array();
             allFacets.put(field, buckets.stream()
                     .map(b -> new FulltextIndex.Facet(b.key().stringValue(), 
(int) b.docCount()))
-                    .collect(Collectors.toList())
+                    .collect(Collectors.toUnmodifiableList())
             );
         }
+        this.processAggregationsTimeNanos = System.nanoTime() - start;
+        return allFacets;
     }
 
-    private void computeStatisticalFacets() {
+    private Map<String, List<FulltextIndex.Facet>> 
computeStatisticalFacets(Map<String, List<FulltextIndex.Facet>> allFacets, 
Map<String, Map<String, MutableInt>> accessibleFacetCounts) {
+        long start = System.nanoTime();
         for (String facetKey : allFacets.keySet()) {
             if (accessibleFacetCounts.containsKey(facetKey)) {
                 Map<String, MutableInt> accessibleFacet = 
accessibleFacetCounts.get(facetKey);
@@ -205,7 +248,18 @@ public class ElasticStatisticalFacetAsyncProvider 
implements ElasticFacetProvide
                                 .collect(Collectors.toList())
                         )
                 );
+        this.computeStatisticalFacetsTimeNanos = System.nanoTime() - start;
         LOG.trace("Statistical facets {}", facets);
+        return facets;
     }
 
+    private String timingsToString() {
+        return String.format("Facet computation times: {query: %d ms, 
processAggregations: %d ms, filterByAcl: %d ms, processHits: %d ms, 
computeStatisticalFacets: %d ms}. Total hits: %d, samples: %d",
+                queryTimeNanos > 0 ? 
TimeUnit.NANOSECONDS.toMillis(queryTimeNanos) : -1,
+                processAggregationsTimeNanos > 0 ? 
TimeUnit.NANOSECONDS.toMillis(processAggregationsTimeNanos) : -1,
+                aclTestTimeNanos.sum() > 0 ? 
TimeUnit.NANOSECONDS.toMillis(aclTestTimeNanos.sum()) : -1,
+                processHitsTimeNanos > 0 ? 
TimeUnit.NANOSECONDS.toMillis(processHitsTimeNanos) : -1,
+                computeStatisticalFacetsTimeNanos > 0 ? 
TimeUnit.NANOSECONDS.toMillis(computeStatisticalFacetsTimeNanos) : -1,
+                totalHits, sampled);
+    }
 }

Reply via email to