This is an automated email from the ASF dual-hosted git repository.

nfsantos pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/jackrabbit-oak.git


The following commit(s) were added to refs/heads/trunk by this push:
     new 8913770ddd OAK-11814 - Improve error handling in 
ElasticResultRowAsyncIterator to deal with timeouts of both slow Elastic 
queries and slow readers. (#2410)
8913770ddd is described below

commit 8913770ddd8638c82b86e8c8f57ff1e6ef8491fa
Author: Nuno Santos <[email protected]>
AuthorDate: Mon Aug 4 08:15:19 2025 +0200

    OAK-11814 - Improve error handling in ElasticResultRowAsyncIterator to deal 
with timeouts of both slow Elastic queries and slow readers. (#2410)
---
 .../index/elastic/ElasticIndexProviderService.java |  15 +-
 .../plugins/index/elastic/query/ElasticIndex.java  |   7 +-
 .../index/elastic/query/ElasticIndexProvider.java  |  18 ++-
 .../query/async/ElasticResultRowAsyncIterator.java | 166 ++++++++++++++++-----
 .../index/elastic/ElasticAbstractQueryTest.java    |  11 +-
 .../elastic/ElasticIndexProviderServiceTest.java   |  23 ++-
 .../ElasticReliabilitySlowReaderQueryTest.java     | 110 ++++++++++++++
 .../ElasticReliabilitySyncIndexingTest.java        |  41 +++++
 8 files changed, 343 insertions(+), 48 deletions(-)

diff --git 
a/oak-search-elastic/src/main/java/org/apache/jackrabbit/oak/plugins/index/elastic/ElasticIndexProviderService.java
 
b/oak-search-elastic/src/main/java/org/apache/jackrabbit/oak/plugins/index/elastic/ElasticIndexProviderService.java
index 896284353e..98dc55d690 100644
--- 
a/oak-search-elastic/src/main/java/org/apache/jackrabbit/oak/plugins/index/elastic/ElasticIndexProviderService.java
+++ 
b/oak-search-elastic/src/main/java/org/apache/jackrabbit/oak/plugins/index/elastic/ElasticIndexProviderService.java
@@ -75,6 +75,7 @@ public class ElasticIndexProviderService {
     protected static final String PROP_ELASTIC_API_KEY_ID = 
"elasticsearch.apiKeyId";
     protected static final String PROP_ELASTIC_API_KEY_SECRET = 
"elasticsearch.apiKeySecret";
     protected static final String PROP_ELASTIC_MAX_RETRY_TIME = 
"elasticsearch.maxRetryTime";
+    protected static final String 
PROP_ELASTIC_ASYNC_ITERATOR_ENQUEUE_TIMEOUT_MS = 
"elasticsearch.asyncIteratorEnqueueTimeoutMs";
     protected static final String PROP_LOCAL_TEXT_EXTRACTION_DIR = 
"localTextExtractionDir";
     private static final boolean DEFAULT_IS_INFERENCE_ENABLED = false;
     private static final String ENV_VAR_OAK_INFERENCE_STATISTICS_DISABLED = 
"OAK_INFERENCE_STATISTICS_DISABLED";
@@ -124,6 +125,13 @@ public class ElasticIndexProviderService {
                 description = "Time in seconds that Elasticsearch should retry 
failed operations. 0 means disabled, no retries. Default is 0 seconds 
(disabled).")
         int elasticsearch_maxRetryTime() default 
ElasticConnection.DEFAULT_MAX_RETRY_TIME;
 
+        @AttributeDefinition(
+                name = "Elasticsearch Async Result Iterator Enqueue Timeout 
(ms)",
+                description = "Time in milliseconds that the async result 
iterator will wait for enqueueing results. " +
+                        "If the timeout is reached, the iterator will stop 
processing and return the results collected so far. " +
+                        "Default is 60000 ms (60 seconds).")
+        long elasticsearch_asyncIteratorEnqueueTimeoutMs() default 
ElasticIndexProvider.DEFAULT_ASYNC_ITERATOR_ENQUEUE_TIMEOUT_MS;
+
         @AttributeDefinition(name = "Local text extraction cache path",
                 description = "Local file system path where text extraction 
cache stores/load entries to recover from timed out operation")
         String localTextExtractionDir();
@@ -233,7 +241,7 @@ public class ElasticIndexProviderService {
 
         LOG.info("Registering Index and Editor providers with connection {}", 
elasticConnection);
 
-        registerIndexProvider(bundleContext);
+        registerIndexProvider(bundleContext, config);
         final int maxRetryTime = 
Integer.getInteger(PROP_ELASTIC_MAX_RETRY_TIME, 
config.elasticsearch_maxRetryTime());
         ElasticRetryPolicy retryPolicy = new ElasticRetryPolicy(100, 
maxRetryTime * 1000L, 5, 100);
         this.elasticIndexEditorProvider = new 
ElasticIndexEditorProvider(indexTracker, elasticConnection, extractedTextCache, 
retryPolicy);
@@ -276,8 +284,9 @@ public class ElasticIndexProviderService {
         oakRegs.add(scheduleWithFixedDelay(whiteboard, task, 
contextConfig.remoteIndexCleanupFrequency()));
     }
 
-    private void registerIndexProvider(BundleContext bundleContext) {
-        ElasticIndexProvider indexProvider = new 
ElasticIndexProvider(indexTracker);
+    private void registerIndexProvider(BundleContext bundleContext, Config 
config) {
+        long asyncIteratorEnqueueTimeoutMs = 
Long.getLong(PROP_ELASTIC_ASYNC_ITERATOR_ENQUEUE_TIMEOUT_MS, 
config.elasticsearch_asyncIteratorEnqueueTimeoutMs());
+        ElasticIndexProvider indexProvider = new 
ElasticIndexProvider(indexTracker, asyncIteratorEnqueueTimeoutMs);
 
         Dictionary<String, Object> props = new Hashtable<>();
         props.put("type", ElasticIndexDefinition.TYPE_ELASTICSEARCH);
diff --git 
a/oak-search-elastic/src/main/java/org/apache/jackrabbit/oak/plugins/index/elastic/query/ElasticIndex.java
 
b/oak-search-elastic/src/main/java/org/apache/jackrabbit/oak/plugins/index/elastic/query/ElasticIndex.java
index 8c3d466ed5..915a1891ef 100644
--- 
a/oak-search-elastic/src/main/java/org/apache/jackrabbit/oak/plugins/index/elastic/query/ElasticIndex.java
+++ 
b/oak-search-elastic/src/main/java/org/apache/jackrabbit/oak/plugins/index/elastic/query/ElasticIndex.java
@@ -44,9 +44,11 @@ class ElasticIndex extends FulltextIndex {
     private static final IteratorRewoundStateProvider 
REWOUND_STATE_PROVIDER_NOOP = () -> 0;
 
     private final ElasticIndexTracker elasticIndexTracker;
+    private final long asyncIteratorEnqueueTimeoutMs;
 
-    ElasticIndex(ElasticIndexTracker elasticIndexTracker) {
+    ElasticIndex(ElasticIndexTracker elasticIndexTracker, long 
asyncIteratorEnqueueTimeoutMs) {
         this.elasticIndexTracker = elasticIndexTracker;
+        this.asyncIteratorEnqueueTimeoutMs = asyncIteratorEnqueueTimeoutMs;
     }
 
     @Override
@@ -131,7 +133,8 @@ class ElasticIndex extends FulltextIndex {
                         responseHandler,
                         plan,
                         partialShouldInclude.apply(getPathRestriction(plan), 
filter.getPathRestriction()),
-                        elasticIndexTracker.getElasticMetricHandler()
+                        elasticIndexTracker.getElasticMetricHandler(),
+                        asyncIteratorEnqueueTimeoutMs
                 );
             }
         } finally {
diff --git 
a/oak-search-elastic/src/main/java/org/apache/jackrabbit/oak/plugins/index/elastic/query/ElasticIndexProvider.java
 
b/oak-search-elastic/src/main/java/org/apache/jackrabbit/oak/plugins/index/elastic/query/ElasticIndexProvider.java
index 5dfbf88088..9f51001e53 100644
--- 
a/oak-search-elastic/src/main/java/org/apache/jackrabbit/oak/plugins/index/elastic/query/ElasticIndexProvider.java
+++ 
b/oak-search-elastic/src/main/java/org/apache/jackrabbit/oak/plugins/index/elastic/query/ElasticIndexProvider.java
@@ -16,6 +16,7 @@
  */
 package org.apache.jackrabbit.oak.plugins.index.elastic.query;
 
+import org.apache.jackrabbit.oak.plugins.index.ConfigHelper;
 import org.apache.jackrabbit.oak.plugins.index.elastic.ElasticIndexTracker;
 import org.apache.jackrabbit.oak.spi.query.QueryIndex;
 import org.apache.jackrabbit.oak.spi.query.QueryIndexProvider;
@@ -26,14 +27,27 @@ import java.util.List;
 
 public class ElasticIndexProvider implements QueryIndexProvider {
 
+    public static final String ASYNC_ITERATOR_ENQUEUE_TIMEOUT_MS_PROPERTY = 
"oak.index.elastic.query.asyncIteratorEnqueueTimeoutMs";
+    public static final long DEFAULT_ASYNC_ITERATOR_ENQUEUE_TIMEOUT_MS = 
60000L; // 60 seconds
+
     private final ElasticIndexTracker indexTracker;
+    private final long asyncIteratorEnqueueTimeoutMs;
 
-    public ElasticIndexProvider(ElasticIndexTracker indexTracker) {
+    public ElasticIndexProvider(ElasticIndexTracker indexTracker, long 
asyncIteratorEnqueueTimeoutMs) {
         this.indexTracker = indexTracker;
+        this.asyncIteratorEnqueueTimeoutMs = asyncIteratorEnqueueTimeoutMs;
+    }
+
+    public ElasticIndexProvider(ElasticIndexTracker indexTracker) {
+        this(indexTracker, 
ConfigHelper.getSystemPropertyAsLong(ASYNC_ITERATOR_ENQUEUE_TIMEOUT_MS_PROPERTY,
 DEFAULT_ASYNC_ITERATOR_ENQUEUE_TIMEOUT_MS));
+    }
+
+    public long getAsyncIteratorEnqueueTimeoutMs() {
+        return asyncIteratorEnqueueTimeoutMs;
     }
 
     @Override
     public @NotNull List<? extends QueryIndex> getQueryIndexes(NodeState 
nodeState) {
-        return List.of(new ElasticIndex(indexTracker));
+        return List.of(new ElasticIndex(indexTracker, 
asyncIteratorEnqueueTimeoutMs));
     }
 }
diff --git 
a/oak-search-elastic/src/main/java/org/apache/jackrabbit/oak/plugins/index/elastic/query/async/ElasticResultRowAsyncIterator.java
 
b/oak-search-elastic/src/main/java/org/apache/jackrabbit/oak/plugins/index/elastic/query/async/ElasticResultRowAsyncIterator.java
index 452c8368c1..c9ec02fea0 100644
--- 
a/oak-search-elastic/src/main/java/org/apache/jackrabbit/oak/plugins/index/elastic/query/async/ElasticResultRowAsyncIterator.java
+++ 
b/oak-search-elastic/src/main/java/org/apache/jackrabbit/oak/plugins/index/elastic/query/async/ElasticResultRowAsyncIterator.java
@@ -46,11 +46,12 @@ import 
co.elastic.clients.elasticsearch.core.search.TotalHitsRelation;
 import java.nio.charset.StandardCharsets;
 import java.util.ArrayList;
 import java.util.BitSet;
-import java.util.Collections;
 import java.util.HashSet;
 import java.util.List;
+import java.util.Map;
 import java.util.Set;
 import java.util.concurrent.BlockingQueue;
+import java.util.concurrent.CompletableFuture;
 import java.util.concurrent.LinkedBlockingQueue;
 import java.util.concurrent.Semaphore;
 import java.util.concurrent.TimeUnit;
@@ -73,7 +74,7 @@ public class ElasticResultRowAsyncIterator implements 
ElasticQueryIterator, Elas
     private static final Logger LOG = 
LoggerFactory.getLogger(ElasticResultRowAsyncIterator.class);
     // this is an internal special message to notify the consumer the result 
set has been completely returned
     private static final FulltextResultRow POISON_PILL =
-            new FulltextResultRow("___OAK_POISON_PILL___", 0d, 
Collections.emptyMap(), null, null);
+            new FulltextResultRow("___OAK_POISON_PILL___", 0d, Map.of(), null, 
null);
 
     private final BlockingQueue<FulltextResultRow> queue;
 
@@ -81,11 +82,19 @@ public class ElasticResultRowAsyncIterator implements 
ElasticQueryIterator, Elas
     private final IndexPlan indexPlan;
     private final Predicate<String> rowInclusionPredicate;
     private final ElasticMetricHandler metricHandler;
+    private final long enqueueTimeoutMs;
     private final ElasticQueryScanner elasticQueryScanner;
     private final ElasticRequestHandler elasticRequestHandler;
     private final ElasticResponseHandler elasticResponseHandler;
     private final ElasticFacetProvider elasticFacetProvider;
-    private final AtomicReference<Throwable> errorRef = new 
AtomicReference<>();
+    // Errors reported by Elastic. These errors are logged but not propagated 
to the caller. They cause end of stream.
+    // This is done to keep compatibility with the Lucene implementation of 
the iterator.
+    // See for instance 
FullTextAnalyzerCommonTest#testFullTextTermWithUnescapedBraces for an example 
of a test where
+    // a parsing error in a query is swallowed by the iterator and the 
iterator returns no results.
+    private final AtomicReference<Throwable> queryErrorRef = new 
AtomicReference<>();
+    // System errors (e.g. timeout, interrupted). These errors are propagated 
to the caller.
+    private final AtomicReference<Throwable> systemErrorRef = new 
AtomicReference<>();
+    private final AtomicBoolean isClosed = new AtomicBoolean(false);
 
     private FulltextResultRow nextRow;
 
@@ -94,30 +103,40 @@ public class ElasticResultRowAsyncIterator implements 
ElasticQueryIterator, Elas
                                          @NotNull ElasticResponseHandler 
elasticResponseHandler,
                                          @NotNull QueryIndex.IndexPlan 
indexPlan,
                                          Predicate<String> 
rowInclusionPredicate,
-                                         ElasticMetricHandler metricHandler) {
+                                         ElasticMetricHandler metricHandler,
+                                         long enqueueTimeoutMs) {
         this.indexNode = indexNode;
         this.elasticRequestHandler = elasticRequestHandler;
         this.elasticResponseHandler = elasticResponseHandler;
         this.indexPlan = indexPlan;
         this.rowInclusionPredicate = rowInclusionPredicate;
         this.metricHandler = metricHandler;
+        this.enqueueTimeoutMs = enqueueTimeoutMs;
         this.elasticFacetProvider = 
elasticRequestHandler.getAsyncFacetProvider(indexNode.getConnection(), 
elasticResponseHandler);
         // set the queue size to the limit of the query. This is to avoid to 
load too many results in memory in case the
         // consumer is slow to process them
-        this.queue = new LinkedBlockingQueue<>((int) 
indexPlan.getFilter().getQueryLimits().getLimitReads());
+        int limitReads = (int) 
indexPlan.getFilter().getQueryLimits().getLimitReads();
+        LOG.debug("Creating ElasticResultRowAsyncIterator with limitReads={}", 
limitReads);
+        this.queue = new LinkedBlockingQueue<>(limitReads);
         this.elasticQueryScanner = initScanner();
     }
 
     @Override
     public boolean hasNext() {
         // if nextRow is not null it means the caller invoked hasNext() before 
without calling next()
-        if (nextRow == null) {
+        if (nextRow == null && !isClosed.get()) {
             if (queue.isEmpty()) {
                 // this triggers, when needed, the scan of the next results 
chunk
                 elasticQueryScanner.scan();
             }
             try {
-                nextRow = queue.poll(indexNode.getDefinition().queryTimeoutMs, 
TimeUnit.MILLISECONDS);
+                long timeoutMs = indexNode.getDefinition().queryTimeoutMs;
+                nextRow = queue.poll(timeoutMs, TimeUnit.MILLISECONDS);
+                if (nextRow == null) {
+                    LOG.warn("Timeout waiting for next result from Elastic, 
waited {} ms. Closing scanner.", timeoutMs);
+                    close();
+                    throw new IllegalStateException("Timeout waiting for next 
result from Elastic");
+                }
             } catch (InterruptedException e) {
                 Thread.currentThread().interrupt();  // restore interrupt 
status
                 throw new IllegalStateException("Error reading next result 
from Elastic", e);
@@ -128,13 +147,25 @@ public class ElasticResultRowAsyncIterator implements 
ElasticQueryIterator, Elas
         // Any exception (such as ParseException) during the prefetch (init 
scanner) via the async call to ES would be available here
         // when the cursor is actually being traversed.
         // This is being done so that we can log the caller stack trace in 
case of any exception from ES and not just the trace of the async query thread.
-
-        Throwable error = errorRef.getAndSet(null);
+        Throwable error = queryErrorRef.get();
         if (error != null) {
             error.fillInStackTrace();
             LOG.error("Error while fetching results from Elastic for [{}]", 
indexPlan.getFilter(), error);
+            return false;
+        }
+
+        if (nextRow != POISON_PILL) {
+            // there is a valid next row
+            return true;
+        }
+
+        // Received the POISON_PILL. Did the elastic query terminate 
gracefully?
+        Throwable systemError = systemErrorRef.get();
+        if (systemError == null) {
+            return false; // No more results, graceful termination
+        } else {
+            throw new IllegalStateException("Error while fetching results from 
Elastic for [" + indexPlan.getFilter() + "]", error);
         }
-        return !POISON_PILL.path.equals(nextRow.path);
     }
 
     @Override
@@ -145,7 +176,7 @@ public class ElasticResultRowAsyncIterator implements 
ElasticQueryIterator, Elas
             }
         }
         FulltextResultRow row = null;
-        if (nextRow != null && !POISON_PILL.path.equals(nextRow.path)) {
+        if (nextRow != null &&  nextRow != POISON_PILL) {
             row = nextRow;
             nextRow = null;
         }
@@ -162,8 +193,15 @@ public class ElasticResultRowAsyncIterator implements 
ElasticQueryIterator, Elas
             }
             LOG.trace("Path {} satisfies hierarchy inclusion rules", path);
             try {
-                queue.put(new FulltextResultRow(path, searchHit.score() != 
null ? searchHit.score() : 0.0,
-                        elasticResponseHandler.excerpts(searchHit), 
elasticFacetProvider, null));
+                FulltextResultRow resultRow = new FulltextResultRow(path, 
searchHit.score() != null ? searchHit.score() : 0.0,
+                        elasticResponseHandler.excerpts(searchHit), 
elasticFacetProvider, null);
+                long startNs = System.nanoTime();
+                boolean successful = queue.offer(resultRow, enqueueTimeoutMs, 
TimeUnit.MILLISECONDS);
+                if (!successful) {
+                    // if we cannot insert the result into the queue, we close 
the scanner to avoid further processing
+                    throw new IllegalStateException("Timeout waiting to insert 
result into the iterator queue for path: " + path +
+                            ". Waited " + (System.nanoTime() - startNs) / 
1_000_000 + " ms");
+                }
             } catch (InterruptedException e) {
                 Thread.currentThread().interrupt();  // restore interrupt 
status
                 throw new IllegalStateException("Error producing results into 
the iterator queue", e);
@@ -176,7 +214,10 @@ public class ElasticResultRowAsyncIterator implements 
ElasticQueryIterator, Elas
     @Override
     public void endData() {
         try {
-            queue.put(POISON_PILL);
+            boolean success = queue.offer(POISON_PILL, enqueueTimeoutMs, 
TimeUnit.MILLISECONDS);
+            if (!success) {
+                LOG.warn("Timeout waiting to insert poison pill into the 
iterator queue. The iterator might not be closed properly.");
+            }
         } catch (InterruptedException e) {
             Thread.currentThread().interrupt();  // restore interrupt status
             throw new IllegalStateException("Error inserting poison pill into 
the iterator queue", e);
@@ -207,7 +248,12 @@ public class ElasticResultRowAsyncIterator implements 
ElasticQueryIterator, Elas
 
     @Override
     public void close() {
-        elasticQueryScanner.close();
+        if (isClosed.compareAndSet(false, true)) {
+            LOG.debug("Closing ElasticResultRowAsyncIterator for index {}", 
indexNode.getDefinition().getIndexPath());
+            elasticQueryScanner.close();
+        } else {
+            LOG.warn("ElasticResultRowAsyncIterator for index {} is already 
closed", indexNode.getDefinition().getIndexPath());
+        }
     }
 
     /**
@@ -230,6 +276,7 @@ public class ElasticResultRowAsyncIterator implements 
ElasticQueryIterator, Elas
 
         // concurrent data structures to coordinate chunks loading
         private final AtomicBoolean anyDataLeft = new AtomicBoolean(false);
+        private final AtomicBoolean isClosed = new AtomicBoolean(false);
 
         private int scannedRows;
         private int requests;
@@ -241,6 +288,7 @@ public class ElasticResultRowAsyncIterator implements 
ElasticQueryIterator, Elas
 
         // Semaphore to guarantee only one in-flight request to Elastic
         private final Semaphore semaphore = new Semaphore(1);
+        volatile private CompletableFuture<SearchResponse<ObjectNode>> 
ongoingRequest;
 
         ElasticQueryScanner(List<ElasticResponseListener> listeners) {
             this.query = elasticRequestHandler.baseQuery();
@@ -290,18 +338,18 @@ public class ElasticResultRowAsyncIterator implements 
ElasticQueryIterator, Elas
             );
 
             LOG.trace("Kicking initial search for query {}", searchRequest);
-            semaphore.tryAcquire();
+            boolean permitAcquired = semaphore.tryAcquire();
+            if (!permitAcquired) {
+                LOG.warn("Semaphore not acquired for initial search, scanner 
is closing or still processing data from the previous scan");
+                throw new IllegalStateException("Scanner is closing or still 
processing data from the previous scan");
+            }
 
             searchStartTime = System.currentTimeMillis();
             requests++;
 
-            indexNode.getConnection().getAsyncClient()
+            ongoingRequest = indexNode.getConnection().getAsyncClient()
                     .search(searchRequest, ObjectNode.class)
-                    .whenComplete(((searchResponse, throwable) -> {
-                        if (throwable != null) {
-                            onFailure(throwable);
-                        } else onSuccess(searchResponse);
-                    }));
+                    .whenComplete((this::handleResponse));
             metricHandler.markQuery(indexNode.getDefinition().getIndexPath(), 
true);
         }
 
@@ -312,16 +360,15 @@ public class ElasticResultRowAsyncIterator implements 
ElasticQueryIterator, Elas
          * Some code in this method relies on structure that are not thread 
safe. We need to make sure
          * these data structures are modified before releasing the semaphore.
          */
-        public void onSuccess(SearchResponse<ObjectNode> searchResponse) {
+        private void onSuccess(@NotNull SearchResponse<ObjectNode> 
searchResponse) {
             long searchTotalTime = System.currentTimeMillis() - 
searchStartTime;
-
             List<Hit<ObjectNode>> searchHits = searchResponse.hits().hits();
             int hitsSize = searchHits != null ? searchHits.size() : 0;
             
metricHandler.measureQuery(indexNode.getDefinition().getIndexPath(), hitsSize, 
searchResponse.took(),
                     searchTotalTime, searchResponse.timedOut());
             if (hitsSize > 0) {
                 long totalHits = searchResponse.hits().total().value();
-                LOG.debug("Processing search response that took {} to read 
{}/{} docs", searchResponse.took(), hitsSize, totalHits);
+                LOG.debug("Processing search response that took {} ms to read 
{}/{} docs", searchResponse.took(), hitsSize, totalHits);
                 lastHitSortValues = searchHits.get(hitsSize - 1).sort();
                 scannedRows += hitsSize;
                 if (searchResponse.hits().total().relation() == 
TotalHitsRelation.Eq) {
@@ -374,12 +421,12 @@ public class ElasticResultRowAsyncIterator implements 
ElasticQueryIterator, Elas
             }
         }
 
-        public void onFailure(Throwable t) {
+        private void onFailure(@NotNull Throwable t) {
             
metricHandler.measureFailedQuery(indexNode.getDefinition().getIndexPath(),
                     System.currentTimeMillis() - searchStartTime);
             // Check in case errorRef is already set - this seems unlikely 
since we close the scanner once we hit failure.
             // But still, in case this do happen, we will log a warning.
-            Throwable error = errorRef.getAndSet(t);
+            Throwable error = queryErrorRef.getAndSet(t);
             if (error != null) {
                 LOG.warn("Error reference for async iterator was previously 
set to {}. It has now been reset to new error {}", error.getMessage(), 
t.getMessage());
             }
@@ -399,6 +446,10 @@ public class ElasticResultRowAsyncIterator implements 
ElasticQueryIterator, Elas
          * Triggers a scan of a new chunk of the result set, if needed.
          */
         private void scan() {
+            if (isClosed.get()) {
+                LOG.debug("Scanner is closed, ignoring scan request");
+                return;
+            }
             if (semaphore.tryAcquire() && anyDataLeft.get()) {
                 final SearchRequest searchReq = SearchRequest.of(s -> s
                         .index(indexNode.getDefinition().getIndexAlias())
@@ -415,19 +466,45 @@ public class ElasticResultRowAsyncIterator implements 
ElasticQueryIterator, Elas
                 LOG.trace("Kicking new search after query {}", searchReq);
 
                 searchStartTime = System.currentTimeMillis();
-                indexNode.getConnection().getAsyncClient()
+                ongoingRequest = indexNode.getConnection().getAsyncClient()
                         .search(searchReq, ObjectNode.class)
-                        .whenComplete(((searchResponse, throwable) -> {
-                            if (throwable != null) {
-                                onFailure(throwable);
-                            } else onSuccess(searchResponse);
-                        }));
+                        .whenComplete(this::handleResponse);
                 
metricHandler.markQuery(indexNode.getDefinition().getIndexPath(), false);
             } else {
                 LOG.trace("Scanner is closing or still processing data from 
the previous scan");
             }
         }
 
+        private void handleResponse(SearchResponse<ObjectNode> searchResponse, 
Throwable throwable) {
+            ongoingRequest = null;
+            if (isClosed.get()) {
+                LOG.info("Scanner is closed, not processing search response");
+                return;
+            }
+            try {
+                if (throwable == null) {
+                    onSuccess(searchResponse);
+                } else {
+                    onFailure(throwable);
+                }
+            } catch (Throwable t) {
+                LOG.warn("Error processing search response", t);
+                Throwable prevValue = systemErrorRef.getAndSet(t);
+                if (prevValue != null) {
+                    LOG.warn("System error reference was previously set to {}. 
It has now been reset to new error {}", prevValue.getMessage(), t.getMessage());
+                }
+                try {
+                    if (!queue.offer(POISON_PILL, enqueueTimeoutMs, 
TimeUnit.MILLISECONDS)) {
+                        LOG.warn("Timeout waiting to enqueue poison pill after 
error processing search response. The iterator might not be closed properly.");
+                    }
+                } catch (InterruptedException e) {
+                    Thread.currentThread().interrupt();  // restore interrupt 
status
+                    LOG.warn("Interrupted while trying to enqueue poison pill 
after error processing search response", e);
+                }
+                // This method should not throw exceptions, see the 
whenComplete() contract.
+            }
+        }
+
         /* picks the size in the fetch array at index=requests or the last if 
out of bound */
         private int getFetchSize(int requestId) {
             int[] queryFetchSizes = indexNode.getDefinition().queryFetchSizes;
@@ -435,11 +512,26 @@ public class ElasticResultRowAsyncIterator implements 
ElasticQueryIterator, Elas
                     queryFetchSizes[requestId] : 
queryFetchSizes[queryFetchSizes.length - 1];
         }
 
-        // close all listeners
         private void close() {
-            semaphore.release();
-            for (ElasticResponseListener l : allListeners) {
-                l.endData();
+            if (isClosed.compareAndSet(false, true)) {
+                LOG.debug("Closing ElasticQueryScanner for index {}", 
indexNode.getDefinition().getIndexPath());
+                // Close listeners and release the semaphore
+                semaphore.release();
+                for (ElasticResponseListener l : allListeners) {
+                    try {
+                        l.endData();
+                    } catch (Exception ex) {
+                        LOG.warn("Error while closing listener {}", 
l.getClass().getName(), ex);
+                    }
+                }
+                allListeners.clear();
+
+                if (ongoingRequest != null) {
+                    ongoingRequest.cancel(true);
+                    ongoingRequest = null;
+                }
+            } else {
+                LOG.info("ElasticQueryScanner for index {} is already closed", 
indexNode.getDefinition().getIndexPath());
             }
         }
     }
diff --git 
a/oak-search-elastic/src/test/java/org/apache/jackrabbit/oak/plugins/index/elastic/ElasticAbstractQueryTest.java
 
b/oak-search-elastic/src/test/java/org/apache/jackrabbit/oak/plugins/index/elastic/ElasticAbstractQueryTest.java
index ff1ec47812..d5fc07271e 100644
--- 
a/oak-search-elastic/src/test/java/org/apache/jackrabbit/oak/plugins/index/elastic/ElasticAbstractQueryTest.java
+++ 
b/oak-search-elastic/src/test/java/org/apache/jackrabbit/oak/plugins/index/elastic/ElasticAbstractQueryTest.java
@@ -157,12 +157,13 @@ public abstract class ElasticAbstractQueryTest extends 
AbstractQueryTest {
         nodeStore = getNodeStore();
         QueryEngineSettings queryEngineSettings = new QueryEngineSettings();
         queryEngineSettings.setInferenceEnabled(isInferenceEnabled());
+        queryEngineSettings.setLimitReads(limitReads());
         InferenceConfig.reInitialize(nodeStore, INFERENCE_CONFIG_PATH, 
isInferenceEnabled());
         indexTracker = new ElasticIndexTracker(esConnection, 
getMetricHandler());
         ElasticIndexEditorProvider editorProvider = new 
ElasticIndexEditorProvider(indexTracker, esConnection,
                 new ExtractedTextCache(10 * FileUtils.ONE_MB, 100),
                 getElasticRetryPolicy());
-        ElasticIndexProvider indexProvider = new 
ElasticIndexProvider(indexTracker);
+        ElasticIndexProvider indexProvider = new 
ElasticIndexProvider(indexTracker, getAsyncIteratorEnqueueTimeoutMs());
 
 
         asyncIndexUpdate = getAsyncIndexUpdate("async", nodeStore, 
CompositeIndexEditorProvider.compose(
@@ -189,6 +190,14 @@ public abstract class ElasticAbstractQueryTest extends 
AbstractQueryTest {
         return oak.createContentRepository();
     }
 
+    protected long getAsyncIteratorEnqueueTimeoutMs() {
+        return ElasticIndexProvider.DEFAULT_ASYNC_ITERATOR_ENQUEUE_TIMEOUT_MS;
+    }
+
+    protected long limitReads() {
+        return QueryEngineSettings.DEFAULT_QUERY_LIMIT_READS;
+    }
+
     protected boolean isInferenceEnabled() {
         return true;
     }
diff --git 
a/oak-search-elastic/src/test/java/org/apache/jackrabbit/oak/plugins/index/elastic/ElasticIndexProviderServiceTest.java
 
b/oak-search-elastic/src/test/java/org/apache/jackrabbit/oak/plugins/index/elastic/ElasticIndexProviderServiceTest.java
index 3cd213e601..0a8cc33e5c 100644
--- 
a/oak-search-elastic/src/test/java/org/apache/jackrabbit/oak/plugins/index/elastic/ElasticIndexProviderServiceTest.java
+++ 
b/oak-search-elastic/src/test/java/org/apache/jackrabbit/oak/plugins/index/elastic/ElasticIndexProviderServiceTest.java
@@ -20,6 +20,7 @@ import org.apache.jackrabbit.oak.osgi.OsgiWhiteboard;
 import org.apache.jackrabbit.oak.plugins.index.AsyncIndexInfoService;
 import org.apache.jackrabbit.oak.plugins.index.IndexEditorProvider;
 import 
org.apache.jackrabbit.oak.plugins.index.elastic.index.ElasticIndexEditorProvider;
+import 
org.apache.jackrabbit.oak.plugins.index.elastic.query.ElasticIndexProvider;
 import 
org.apache.jackrabbit.oak.plugins.index.elastic.query.inference.InferenceConfig;
 import org.apache.jackrabbit.oak.plugins.memory.MemoryNodeStore;
 import org.apache.jackrabbit.oak.query.QueryEngineSettings;
@@ -33,6 +34,7 @@ import org.apache.jackrabbit.oak.stats.MeterStats;
 import org.apache.jackrabbit.oak.stats.StatisticsProvider;
 import org.apache.sling.testing.mock.osgi.MockOsgi;
 import org.apache.sling.testing.mock.osgi.junit.OsgiContext;
+import org.jetbrains.annotations.Nullable;
 import org.junit.Before;
 import org.junit.ClassRule;
 import org.junit.Rule;
@@ -48,6 +50,7 @@ import java.util.concurrent.TimeUnit;
 import static 
org.apache.jackrabbit.oak.plugins.index.elastic.ElasticIndexProviderService.PROP_DISABLED;
 import static 
org.apache.jackrabbit.oak.plugins.index.elastic.ElasticIndexProviderService.PROP_ELASTIC_API_KEY_ID;
 import static 
org.apache.jackrabbit.oak.plugins.index.elastic.ElasticIndexProviderService.PROP_ELASTIC_API_KEY_SECRET;
+import static 
org.apache.jackrabbit.oak.plugins.index.elastic.ElasticIndexProviderService.PROP_ELASTIC_ASYNC_ITERATOR_ENQUEUE_TIMEOUT_MS;
 import static 
org.apache.jackrabbit.oak.plugins.index.elastic.ElasticIndexProviderService.PROP_ELASTIC_HOST;
 import static 
org.apache.jackrabbit.oak.plugins.index.elastic.ElasticIndexProviderService.PROP_ELASTIC_MAX_RETRY_TIME;
 import static 
org.apache.jackrabbit.oak.plugins.index.elastic.ElasticIndexProviderService.PROP_ELASTIC_PORT;
@@ -190,14 +193,28 @@ public class ElasticIndexProviderServiceTest {
         MockOsgi.activate(service, context.bundleContext(), props);
 
         assertNotNull(context.getService(QueryIndexProvider.class));
-        assertNotNull(context.getService(IndexEditorProvider.class));
-
-        ElasticIndexEditorProvider editorProvider = 
(ElasticIndexEditorProvider) context.getService(IndexEditorProvider.class);
+        IndexEditorProvider indexEditorProvider = 
context.getService(IndexEditorProvider.class);
+        assertNotNull(indexEditorProvider);
+        ElasticIndexEditorProvider editorProvider = 
(ElasticIndexEditorProvider) indexEditorProvider;
         assertEquals(TimeUnit.SECONDS.toMillis(600), 
editorProvider.getRetryPolicy().getMaxRetryTimeMs());
 
         MockOsgi.deactivate(service, context.bundleContext());
     }
 
+    @Test
+    public void withAsyncIteratorEnqueueTimeoutMs() {
+        Map<String, Object> props = new HashMap<>(getElasticConfig());
+        props.put(PROP_ELASTIC_ASYNC_ITERATOR_ENQUEUE_TIMEOUT_MS, 123);
+        MockOsgi.activate(service, context.bundleContext(), props);
+
+        QueryIndexProvider queryIndexProvider = 
context.getService(QueryIndexProvider.class);
+        assertNotNull(queryIndexProvider);
+        ElasticIndexProvider elasticIndexProvider = (ElasticIndexProvider) 
queryIndexProvider;
+        assertEquals(123, 
elasticIndexProvider.getAsyncIteratorEnqueueTimeoutMs());
+
+        MockOsgi.deactivate(service, context.bundleContext());
+    }
+
 
     private HashMap<String, Object> getElasticConfig() {
         HashMap<String, Object> config = new HashMap<>();
diff --git 
a/oak-search-elastic/src/test/java/org/apache/jackrabbit/oak/plugins/index/elastic/ElasticReliabilitySlowReaderQueryTest.java
 
b/oak-search-elastic/src/test/java/org/apache/jackrabbit/oak/plugins/index/elastic/ElasticReliabilitySlowReaderQueryTest.java
new file mode 100644
index 0000000000..ba4372d2ee
--- /dev/null
+++ 
b/oak-search-elastic/src/test/java/org/apache/jackrabbit/oak/plugins/index/elastic/ElasticReliabilitySlowReaderQueryTest.java
@@ -0,0 +1,110 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.jackrabbit.oak.plugins.index.elastic;
+
+import org.apache.jackrabbit.oak.api.CommitFailedException;
+import org.apache.jackrabbit.oak.api.Result;
+import org.apache.jackrabbit.oak.api.ResultRow;
+import org.apache.jackrabbit.oak.api.Tree;
+import org.apache.jackrabbit.oak.api.Type;
+import 
org.apache.jackrabbit.oak.plugins.index.search.util.IndexDefinitionBuilder;
+import org.apache.jackrabbit.oak.spi.query.QueryConstants;
+import org.junit.Test;
+
+import java.util.ArrayList;
+import java.util.Iterator;
+import java.util.List;
+import java.util.UUID;
+
+import static org.apache.jackrabbit.oak.api.QueryEngine.NO_BINDINGS;
+import static org.hamcrest.CoreMatchers.containsString;
+import static org.hamcrest.MatcherAssert.assertThat;
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.assertTrue;
+import static org.junit.Assert.fail;
+
+public class ElasticReliabilitySlowReaderQueryTest extends 
ElasticAbstractQueryTest {
+
+    private static final String QUERY_PROP_A = "select [jcr:path] from 
[nt:base] where propa is not null";
+
+    @Override
+    protected long getAsyncIteratorEnqueueTimeoutMs() {
+        return 1000;
+    }
+
+    @Override
+    protected long limitReads() {
+        return 2;
+    }
+
+    /**
+     * This tests the case where a reader thread is very slow reading the 
results and the ElasticResultRowAsyncIterator
+     * timeouts out enqueuing results in its internal result queue. To trigger 
a timeout in the ElasticResultRowAsyncIterator,
+     * we set the timeout for the internal queue to 1s and limit the number of 
results that can be stored in the queue to 2.
+     * The following should happen:
+     * - the first read will succeed because the reader thread reads it 
immediately.
+     * - The reader thread waits for 2 seconds. During this time, the iterator 
reads 2 more results and then blocks trying
+     * to enqueue the 4th result because the queue is full. After 1 second of 
waiting, it times out and closes the iterator.
+     * - the reader thread awakes up and tries to continue reading. It reads 
the next two results which were successfully
+     * put in the queue before the iterator timed out, even though the 
iterator is already closed.
+     * - Then it fails to read the 4th result and receives an exception 
indicating that the iterator has timed out.
+     */
+    @Test
+    public void slowReader() throws Exception {
+        String indexName = UUID.randomUUID().toString();
+        IndexDefinitionBuilder builder = createIndex("propa");
+        setIndex(indexName, builder);
+        root.commit();
+
+        // Populate the index
+        addNodes(6);
+
+        // simulate a slow reader. Reads the first result, waits for 2 seconds,
+        Result result = executeQuery(QUERY_PROP_A, SQL2, NO_BINDINGS);
+        ArrayList<String> resultPaths = new ArrayList<>();
+        Iterator<? extends ResultRow> resultRows = result.getRows().iterator();
+        // Read the first result immediately
+        assertTrue(resultRows.hasNext());
+        
resultPaths.add(resultRows.next().getValue(QueryConstants.JCR_PATH).getValue(Type.STRING));
+        Thread.sleep(2000L); // Simulate slow reading
+        // The iterator should have timed out trying to enqueue the next 
result. The next two results should still be
+        // available because they were enqueued before the queue got full and 
the iterator timed out.
+        assertTrue(resultRows.hasNext());
+        
resultPaths.add(resultRows.next().getValue(QueryConstants.JCR_PATH).getValue(Type.STRING));
+        assertTrue(resultRows.hasNext());
+        
resultPaths.add(resultRows.next().getValue(QueryConstants.JCR_PATH).getValue(Type.STRING));
+        // The next read should fail
+        try {
+            assertFalse(resultRows.hasNext());
+            fail("Expected an exception while reading results");
+        } catch (IllegalStateException e) {
+            assertThat(e.getMessage(), containsString("Error while fetching 
results from Elastic"));
+        }
+        assertEquals(List.of("/test/a0", "/test/a1", "/test/a2"), resultPaths);
+    }
+
+    private void addNodes(int count) throws CommitFailedException {
+        Tree test = root.getTree("/").addChild("test");
+        for (int i = 0; i < count; i++) {
+            test.addChild("a" + i).setProperty("propa", "a" + i);
+        }
+        root.commit();
+        this.asyncIndexUpdate.run();
+        assertEventually(() -> assertQuery(QUERY_PROP_A, List.of("/test/a0", 
"/test/a1", "/test/a2", "/test/a3", "/test/a4", "/test/a5")));
+    }
+}
diff --git 
a/oak-search-elastic/src/test/java/org/apache/jackrabbit/oak/plugins/index/elastic/ElasticReliabilitySyncIndexingTest.java
 
b/oak-search-elastic/src/test/java/org/apache/jackrabbit/oak/plugins/index/elastic/ElasticReliabilitySyncIndexingTest.java
index 0c3d4d1a1f..63dfeee171 100644
--- 
a/oak-search-elastic/src/test/java/org/apache/jackrabbit/oak/plugins/index/elastic/ElasticReliabilitySyncIndexingTest.java
+++ 
b/oak-search-elastic/src/test/java/org/apache/jackrabbit/oak/plugins/index/elastic/ElasticReliabilitySyncIndexingTest.java
@@ -17,9 +17,11 @@
 package org.apache.jackrabbit.oak.plugins.index.elastic;
 
 import eu.rekawek.toxiproxy.model.ToxicDirection;
+import eu.rekawek.toxiproxy.model.toxic.Latency;
 import eu.rekawek.toxiproxy.model.toxic.LimitData;
 import org.apache.jackrabbit.oak.api.CommitFailedException;
 import org.apache.jackrabbit.oak.api.Tree;
+import 
org.apache.jackrabbit.oak.plugins.index.search.util.IndexDefinitionBuilder;
 import org.junit.Test;
 
 import java.util.List;
@@ -29,6 +31,7 @@ import java.util.UUID;
 import static org.hamcrest.CoreMatchers.containsString;
 import static org.hamcrest.CoreMatchers.not;
 import static org.hamcrest.MatcherAssert.assertThat;
+import static org.junit.Assert.fail;
 
 public class ElasticReliabilitySyncIndexingTest extends ElasticReliabilityTest 
{
 
@@ -72,6 +75,44 @@ public class ElasticReliabilitySyncIndexingTest extends 
ElasticReliabilityTest {
         });
     }
 
+    @Test
+    public void elasticQueryTimeout() throws Exception {
+        String query = "select [jcr:path] from [nt:base] where propa is not 
null";
+
+        // Simulate a timeout of a query to the Elasticsearch cluster by 
setting a very low Oak Query timeout (10 ms)
+        // and a high latency on the connection to Elastic. This way, the 
ElasticResultRowAsyncIterator will timeout
+        // waiting for the response from Elastic.
+        // Index with very low query timeout, 100ms
+        IndexDefinitionBuilder builderPropAIndex = createIndex("propa");
+        
builderPropAIndex.getBuilderTree().setProperty(ElasticIndexDefinition.QUERY_TIMEOUT_MS,
 500L);
+        setIndex(UUID.randomUUID().toString(), builderPropAIndex);
+
+        root.commit();
+
+        Tree test = root.getTree("/").addChild("test");
+        test.addChild("a").setProperty("propa", "a");
+        root.commit();
+
+        // Wait for the index to be updated
+        assertEventually(() -> assertQuery(query, List.of("/test/a")));
+
+        // simulate Elastic taking a long time to respond
+        Latency slowQueryToxic = proxy.toxics()
+                .latency("slow_query", ToxicDirection.DOWNSTREAM, 1000L);
+        try {
+            // This Oak query should timeout after 100ms,
+            executeQuery(query, SQL2);
+            fail("Expected a timeout exception");
+        } catch (IllegalStateException e) {
+            LOG.info("Expected timeout exception.", e);
+            assertThat(e.getMessage(), containsString("Timeout"));
+        }
+        slowQueryToxic.remove();
+
+        // After removing the latency toxic, the query should succeed
+        assertEventually(() -> assertQuery(query, List.of("/test/a")));
+    }
+
     @Test
     public void connectionCutOnIndex() throws Exception {
         String indexName = UUID.randomUUID().toString();


Reply via email to