This is an automated email from the ASF dual-hosted git repository.
nfsantos pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/jackrabbit-oak.git
The following commit(s) were added to refs/heads/trunk by this push:
new 8913770ddd OAK-11814 - Improve error handling in
ElasticResultRowAsyncIterator to deal with timeouts of both slow Elastic
queries and slow readers. (#2410)
8913770ddd is described below
commit 8913770ddd8638c82b86e8c8f57ff1e6ef8491fa
Author: Nuno Santos <[email protected]>
AuthorDate: Mon Aug 4 08:15:19 2025 +0200
OAK-11814 - Improve error handling in ElasticResultRowAsyncIterator to deal
with timeouts of both slow Elastic queries and slow readers. (#2410)
---
.../index/elastic/ElasticIndexProviderService.java | 15 +-
.../plugins/index/elastic/query/ElasticIndex.java | 7 +-
.../index/elastic/query/ElasticIndexProvider.java | 18 ++-
.../query/async/ElasticResultRowAsyncIterator.java | 166 ++++++++++++++++-----
.../index/elastic/ElasticAbstractQueryTest.java | 11 +-
.../elastic/ElasticIndexProviderServiceTest.java | 23 ++-
.../ElasticReliabilitySlowReaderQueryTest.java | 110 ++++++++++++++
.../ElasticReliabilitySyncIndexingTest.java | 41 +++++
8 files changed, 343 insertions(+), 48 deletions(-)
diff --git
a/oak-search-elastic/src/main/java/org/apache/jackrabbit/oak/plugins/index/elastic/ElasticIndexProviderService.java
b/oak-search-elastic/src/main/java/org/apache/jackrabbit/oak/plugins/index/elastic/ElasticIndexProviderService.java
index 896284353e..98dc55d690 100644
---
a/oak-search-elastic/src/main/java/org/apache/jackrabbit/oak/plugins/index/elastic/ElasticIndexProviderService.java
+++
b/oak-search-elastic/src/main/java/org/apache/jackrabbit/oak/plugins/index/elastic/ElasticIndexProviderService.java
@@ -75,6 +75,7 @@ public class ElasticIndexProviderService {
protected static final String PROP_ELASTIC_API_KEY_ID =
"elasticsearch.apiKeyId";
protected static final String PROP_ELASTIC_API_KEY_SECRET =
"elasticsearch.apiKeySecret";
protected static final String PROP_ELASTIC_MAX_RETRY_TIME =
"elasticsearch.maxRetryTime";
+ protected static final String
PROP_ELASTIC_ASYNC_ITERATOR_ENQUEUE_TIMEOUT_MS =
"elasticsearch.asyncIteratorEnqueueTimeoutMs";
protected static final String PROP_LOCAL_TEXT_EXTRACTION_DIR =
"localTextExtractionDir";
private static final boolean DEFAULT_IS_INFERENCE_ENABLED = false;
private static final String ENV_VAR_OAK_INFERENCE_STATISTICS_DISABLED =
"OAK_INFERENCE_STATISTICS_DISABLED";
@@ -124,6 +125,13 @@ public class ElasticIndexProviderService {
description = "Time in seconds that Elasticsearch should retry
failed operations. 0 means disabled, no retries. Default is 0 seconds
(disabled).")
int elasticsearch_maxRetryTime() default
ElasticConnection.DEFAULT_MAX_RETRY_TIME;
+ @AttributeDefinition(
+ name = "Elasticsearch Async Result Iterator Enqueue Timeout
(ms)",
+ description = "Time in milliseconds that the async result
iterator will wait for enqueueing results. " +
+ "If the timeout is reached, the iterator will stop
processing and return the results collected so far. " +
+ "Default is 60000 ms (60 seconds).")
+ long elasticsearch_asyncIteratorEnqueueTimeoutMs() default
ElasticIndexProvider.DEFAULT_ASYNC_ITERATOR_ENQUEUE_TIMEOUT_MS;
+
@AttributeDefinition(name = "Local text extraction cache path",
description = "Local file system path where text extraction
cache stores/load entries to recover from timed out operation")
String localTextExtractionDir();
@@ -233,7 +241,7 @@ public class ElasticIndexProviderService {
LOG.info("Registering Index and Editor providers with connection {}",
elasticConnection);
- registerIndexProvider(bundleContext);
+ registerIndexProvider(bundleContext, config);
final int maxRetryTime =
Integer.getInteger(PROP_ELASTIC_MAX_RETRY_TIME,
config.elasticsearch_maxRetryTime());
ElasticRetryPolicy retryPolicy = new ElasticRetryPolicy(100,
maxRetryTime * 1000L, 5, 100);
this.elasticIndexEditorProvider = new
ElasticIndexEditorProvider(indexTracker, elasticConnection, extractedTextCache,
retryPolicy);
@@ -276,8 +284,9 @@ public class ElasticIndexProviderService {
oakRegs.add(scheduleWithFixedDelay(whiteboard, task,
contextConfig.remoteIndexCleanupFrequency()));
}
- private void registerIndexProvider(BundleContext bundleContext) {
- ElasticIndexProvider indexProvider = new
ElasticIndexProvider(indexTracker);
+ private void registerIndexProvider(BundleContext bundleContext, Config
config) {
+ long asyncIteratorEnqueueTimeoutMs =
Long.getLong(PROP_ELASTIC_ASYNC_ITERATOR_ENQUEUE_TIMEOUT_MS,
config.elasticsearch_asyncIteratorEnqueueTimeoutMs());
+ ElasticIndexProvider indexProvider = new
ElasticIndexProvider(indexTracker, asyncIteratorEnqueueTimeoutMs);
Dictionary<String, Object> props = new Hashtable<>();
props.put("type", ElasticIndexDefinition.TYPE_ELASTICSEARCH);
diff --git
a/oak-search-elastic/src/main/java/org/apache/jackrabbit/oak/plugins/index/elastic/query/ElasticIndex.java
b/oak-search-elastic/src/main/java/org/apache/jackrabbit/oak/plugins/index/elastic/query/ElasticIndex.java
index 8c3d466ed5..915a1891ef 100644
---
a/oak-search-elastic/src/main/java/org/apache/jackrabbit/oak/plugins/index/elastic/query/ElasticIndex.java
+++
b/oak-search-elastic/src/main/java/org/apache/jackrabbit/oak/plugins/index/elastic/query/ElasticIndex.java
@@ -44,9 +44,11 @@ class ElasticIndex extends FulltextIndex {
private static final IteratorRewoundStateProvider
REWOUND_STATE_PROVIDER_NOOP = () -> 0;
private final ElasticIndexTracker elasticIndexTracker;
+ private final long asyncIteratorEnqueueTimeoutMs;
- ElasticIndex(ElasticIndexTracker elasticIndexTracker) {
+ ElasticIndex(ElasticIndexTracker elasticIndexTracker, long
asyncIteratorEnqueueTimeoutMs) {
this.elasticIndexTracker = elasticIndexTracker;
+ this.asyncIteratorEnqueueTimeoutMs = asyncIteratorEnqueueTimeoutMs;
}
@Override
@@ -131,7 +133,8 @@ class ElasticIndex extends FulltextIndex {
responseHandler,
plan,
partialShouldInclude.apply(getPathRestriction(plan),
filter.getPathRestriction()),
- elasticIndexTracker.getElasticMetricHandler()
+ elasticIndexTracker.getElasticMetricHandler(),
+ asyncIteratorEnqueueTimeoutMs
);
}
} finally {
diff --git
a/oak-search-elastic/src/main/java/org/apache/jackrabbit/oak/plugins/index/elastic/query/ElasticIndexProvider.java
b/oak-search-elastic/src/main/java/org/apache/jackrabbit/oak/plugins/index/elastic/query/ElasticIndexProvider.java
index 5dfbf88088..9f51001e53 100644
---
a/oak-search-elastic/src/main/java/org/apache/jackrabbit/oak/plugins/index/elastic/query/ElasticIndexProvider.java
+++
b/oak-search-elastic/src/main/java/org/apache/jackrabbit/oak/plugins/index/elastic/query/ElasticIndexProvider.java
@@ -16,6 +16,7 @@
*/
package org.apache.jackrabbit.oak.plugins.index.elastic.query;
+import org.apache.jackrabbit.oak.plugins.index.ConfigHelper;
import org.apache.jackrabbit.oak.plugins.index.elastic.ElasticIndexTracker;
import org.apache.jackrabbit.oak.spi.query.QueryIndex;
import org.apache.jackrabbit.oak.spi.query.QueryIndexProvider;
@@ -26,14 +27,27 @@ import java.util.List;
public class ElasticIndexProvider implements QueryIndexProvider {
+ public static final String ASYNC_ITERATOR_ENQUEUE_TIMEOUT_MS_PROPERTY =
"oak.index.elastic.query.asyncIteratorEnqueueTimeoutMs";
+ public static final long DEFAULT_ASYNC_ITERATOR_ENQUEUE_TIMEOUT_MS =
60000L; // 60 seconds
+
private final ElasticIndexTracker indexTracker;
+ private final long asyncIteratorEnqueueTimeoutMs;
- public ElasticIndexProvider(ElasticIndexTracker indexTracker) {
+ public ElasticIndexProvider(ElasticIndexTracker indexTracker, long
asyncIteratorEnqueueTimeoutMs) {
this.indexTracker = indexTracker;
+ this.asyncIteratorEnqueueTimeoutMs = asyncIteratorEnqueueTimeoutMs;
+ }
+
+ public ElasticIndexProvider(ElasticIndexTracker indexTracker) {
+ this(indexTracker,
ConfigHelper.getSystemPropertyAsLong(ASYNC_ITERATOR_ENQUEUE_TIMEOUT_MS_PROPERTY,
DEFAULT_ASYNC_ITERATOR_ENQUEUE_TIMEOUT_MS));
+ }
+
+ public long getAsyncIteratorEnqueueTimeoutMs() {
+ return asyncIteratorEnqueueTimeoutMs;
}
@Override
public @NotNull List<? extends QueryIndex> getQueryIndexes(NodeState
nodeState) {
- return List.of(new ElasticIndex(indexTracker));
+ return List.of(new ElasticIndex(indexTracker,
asyncIteratorEnqueueTimeoutMs));
}
}
diff --git
a/oak-search-elastic/src/main/java/org/apache/jackrabbit/oak/plugins/index/elastic/query/async/ElasticResultRowAsyncIterator.java
b/oak-search-elastic/src/main/java/org/apache/jackrabbit/oak/plugins/index/elastic/query/async/ElasticResultRowAsyncIterator.java
index 452c8368c1..c9ec02fea0 100644
---
a/oak-search-elastic/src/main/java/org/apache/jackrabbit/oak/plugins/index/elastic/query/async/ElasticResultRowAsyncIterator.java
+++
b/oak-search-elastic/src/main/java/org/apache/jackrabbit/oak/plugins/index/elastic/query/async/ElasticResultRowAsyncIterator.java
@@ -46,11 +46,12 @@ import
co.elastic.clients.elasticsearch.core.search.TotalHitsRelation;
import java.nio.charset.StandardCharsets;
import java.util.ArrayList;
import java.util.BitSet;
-import java.util.Collections;
import java.util.HashSet;
import java.util.List;
+import java.util.Map;
import java.util.Set;
import java.util.concurrent.BlockingQueue;
+import java.util.concurrent.CompletableFuture;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.Semaphore;
import java.util.concurrent.TimeUnit;
@@ -73,7 +74,7 @@ public class ElasticResultRowAsyncIterator implements
ElasticQueryIterator, Elas
private static final Logger LOG =
LoggerFactory.getLogger(ElasticResultRowAsyncIterator.class);
// this is an internal special message to notify the consumer the result
set has been completely returned
private static final FulltextResultRow POISON_PILL =
- new FulltextResultRow("___OAK_POISON_PILL___", 0d,
Collections.emptyMap(), null, null);
+ new FulltextResultRow("___OAK_POISON_PILL___", 0d, Map.of(), null,
null);
private final BlockingQueue<FulltextResultRow> queue;
@@ -81,11 +82,19 @@ public class ElasticResultRowAsyncIterator implements
ElasticQueryIterator, Elas
private final IndexPlan indexPlan;
private final Predicate<String> rowInclusionPredicate;
private final ElasticMetricHandler metricHandler;
+ private final long enqueueTimeoutMs;
private final ElasticQueryScanner elasticQueryScanner;
private final ElasticRequestHandler elasticRequestHandler;
private final ElasticResponseHandler elasticResponseHandler;
private final ElasticFacetProvider elasticFacetProvider;
- private final AtomicReference<Throwable> errorRef = new
AtomicReference<>();
+ // Errors reported by Elastic. These errors are logged but not propagated
to the caller. They cause end of stream.
+ // This is done to keep compatibility with the Lucene implementation of
the iterator.
+ // See for instance
FullTextAnalyzerCommonTest#testFullTextTermWithUnescapedBraces for an example
of a test where
+ // a parsing error in a query is swallowed by the iterator and the
iterator returns no results.
+ private final AtomicReference<Throwable> queryErrorRef = new
AtomicReference<>();
+ // System errors (e.g. timeout, interrupted). These errors are propagated
to the caller.
+ private final AtomicReference<Throwable> systemErrorRef = new
AtomicReference<>();
+ private final AtomicBoolean isClosed = new AtomicBoolean(false);
private FulltextResultRow nextRow;
@@ -94,30 +103,40 @@ public class ElasticResultRowAsyncIterator implements
ElasticQueryIterator, Elas
@NotNull ElasticResponseHandler
elasticResponseHandler,
@NotNull QueryIndex.IndexPlan
indexPlan,
Predicate<String>
rowInclusionPredicate,
- ElasticMetricHandler metricHandler) {
+ ElasticMetricHandler metricHandler,
+ long enqueueTimeoutMs) {
this.indexNode = indexNode;
this.elasticRequestHandler = elasticRequestHandler;
this.elasticResponseHandler = elasticResponseHandler;
this.indexPlan = indexPlan;
this.rowInclusionPredicate = rowInclusionPredicate;
this.metricHandler = metricHandler;
+ this.enqueueTimeoutMs = enqueueTimeoutMs;
this.elasticFacetProvider =
elasticRequestHandler.getAsyncFacetProvider(indexNode.getConnection(),
elasticResponseHandler);
// set the queue size to the limit of the query. This is to avoid to
load too many results in memory in case the
// consumer is slow to process them
- this.queue = new LinkedBlockingQueue<>((int)
indexPlan.getFilter().getQueryLimits().getLimitReads());
+ int limitReads = (int)
indexPlan.getFilter().getQueryLimits().getLimitReads();
+ LOG.debug("Creating ElasticResultRowAsyncIterator with limitReads={}",
limitReads);
+ this.queue = new LinkedBlockingQueue<>(limitReads);
this.elasticQueryScanner = initScanner();
}
@Override
public boolean hasNext() {
// if nextRow is not null it means the caller invoked hasNext() before
without calling next()
- if (nextRow == null) {
+ if (nextRow == null && !isClosed.get()) {
if (queue.isEmpty()) {
// this triggers, when needed, the scan of the next results
chunk
elasticQueryScanner.scan();
}
try {
- nextRow = queue.poll(indexNode.getDefinition().queryTimeoutMs,
TimeUnit.MILLISECONDS);
+ long timeoutMs = indexNode.getDefinition().queryTimeoutMs;
+ nextRow = queue.poll(timeoutMs, TimeUnit.MILLISECONDS);
+ if (nextRow == null) {
+ LOG.warn("Timeout waiting for next result from Elastic,
waited {} ms. Closing scanner.", timeoutMs);
+ close();
+ throw new IllegalStateException("Timeout waiting for next
result from Elastic");
+ }
} catch (InterruptedException e) {
Thread.currentThread().interrupt(); // restore interrupt
status
throw new IllegalStateException("Error reading next result
from Elastic", e);
@@ -128,13 +147,25 @@ public class ElasticResultRowAsyncIterator implements
ElasticQueryIterator, Elas
// Any exception (such as ParseException) during the prefetch (init
scanner) via the async call to ES would be available here
// when the cursor is actually being traversed.
// This is being done so that we can log the caller stack trace in
case of any exception from ES and not just the trace of the async query thread.
-
- Throwable error = errorRef.getAndSet(null);
+ Throwable error = queryErrorRef.get();
if (error != null) {
error.fillInStackTrace();
LOG.error("Error while fetching results from Elastic for [{}]",
indexPlan.getFilter(), error);
+ return false;
+ }
+
+ if (nextRow != POISON_PILL) {
+ // there is a valid next row
+ return true;
+ }
+
+ // Received the POISON_PILL. Did the elastic query terminate
gracefully?
+ Throwable systemError = systemErrorRef.get();
+ if (systemError == null) {
+ return false; // No more results, graceful termination
+ } else {
+ throw new IllegalStateException("Error while fetching results from
Elastic for [" + indexPlan.getFilter() + "]", error);
}
- return !POISON_PILL.path.equals(nextRow.path);
}
@Override
@@ -145,7 +176,7 @@ public class ElasticResultRowAsyncIterator implements
ElasticQueryIterator, Elas
}
}
FulltextResultRow row = null;
- if (nextRow != null && !POISON_PILL.path.equals(nextRow.path)) {
+ if (nextRow != null && nextRow != POISON_PILL) {
row = nextRow;
nextRow = null;
}
@@ -162,8 +193,15 @@ public class ElasticResultRowAsyncIterator implements
ElasticQueryIterator, Elas
}
LOG.trace("Path {} satisfies hierarchy inclusion rules", path);
try {
- queue.put(new FulltextResultRow(path, searchHit.score() !=
null ? searchHit.score() : 0.0,
- elasticResponseHandler.excerpts(searchHit),
elasticFacetProvider, null));
+ FulltextResultRow resultRow = new FulltextResultRow(path,
searchHit.score() != null ? searchHit.score() : 0.0,
+ elasticResponseHandler.excerpts(searchHit),
elasticFacetProvider, null);
+ long startNs = System.nanoTime();
+ boolean successful = queue.offer(resultRow, enqueueTimeoutMs,
TimeUnit.MILLISECONDS);
+ if (!successful) {
+ // if we cannot insert the result into the queue, we close
the scanner to avoid further processing
+ throw new IllegalStateException("Timeout waiting to insert
result into the iterator queue for path: " + path +
+ ". Waited " + (System.nanoTime() - startNs) /
1_000_000 + " ms");
+ }
} catch (InterruptedException e) {
Thread.currentThread().interrupt(); // restore interrupt
status
throw new IllegalStateException("Error producing results into
the iterator queue", e);
@@ -176,7 +214,10 @@ public class ElasticResultRowAsyncIterator implements
ElasticQueryIterator, Elas
@Override
public void endData() {
try {
- queue.put(POISON_PILL);
+ boolean success = queue.offer(POISON_PILL, enqueueTimeoutMs,
TimeUnit.MILLISECONDS);
+ if (!success) {
+ LOG.warn("Timeout waiting to insert poison pill into the
iterator queue. The iterator might not be closed properly.");
+ }
} catch (InterruptedException e) {
Thread.currentThread().interrupt(); // restore interrupt status
throw new IllegalStateException("Error inserting poison pill into
the iterator queue", e);
@@ -207,7 +248,12 @@ public class ElasticResultRowAsyncIterator implements
ElasticQueryIterator, Elas
@Override
public void close() {
- elasticQueryScanner.close();
+ if (isClosed.compareAndSet(false, true)) {
+ LOG.debug("Closing ElasticResultRowAsyncIterator for index {}",
indexNode.getDefinition().getIndexPath());
+ elasticQueryScanner.close();
+ } else {
+ LOG.warn("ElasticResultRowAsyncIterator for index {} is already
closed", indexNode.getDefinition().getIndexPath());
+ }
}
/**
@@ -230,6 +276,7 @@ public class ElasticResultRowAsyncIterator implements
ElasticQueryIterator, Elas
// concurrent data structures to coordinate chunks loading
private final AtomicBoolean anyDataLeft = new AtomicBoolean(false);
+ private final AtomicBoolean isClosed = new AtomicBoolean(false);
private int scannedRows;
private int requests;
@@ -241,6 +288,7 @@ public class ElasticResultRowAsyncIterator implements
ElasticQueryIterator, Elas
// Semaphore to guarantee only one in-flight request to Elastic
private final Semaphore semaphore = new Semaphore(1);
+ volatile private CompletableFuture<SearchResponse<ObjectNode>>
ongoingRequest;
ElasticQueryScanner(List<ElasticResponseListener> listeners) {
this.query = elasticRequestHandler.baseQuery();
@@ -290,18 +338,18 @@ public class ElasticResultRowAsyncIterator implements
ElasticQueryIterator, Elas
);
LOG.trace("Kicking initial search for query {}", searchRequest);
- semaphore.tryAcquire();
+ boolean permitAcquired = semaphore.tryAcquire();
+ if (!permitAcquired) {
+ LOG.warn("Semaphore not acquired for initial search, scanner
is closing or still processing data from the previous scan");
+ throw new IllegalStateException("Scanner is closing or still
processing data from the previous scan");
+ }
searchStartTime = System.currentTimeMillis();
requests++;
- indexNode.getConnection().getAsyncClient()
+ ongoingRequest = indexNode.getConnection().getAsyncClient()
.search(searchRequest, ObjectNode.class)
- .whenComplete(((searchResponse, throwable) -> {
- if (throwable != null) {
- onFailure(throwable);
- } else onSuccess(searchResponse);
- }));
+ .whenComplete((this::handleResponse));
metricHandler.markQuery(indexNode.getDefinition().getIndexPath(),
true);
}
@@ -312,16 +360,15 @@ public class ElasticResultRowAsyncIterator implements
ElasticQueryIterator, Elas
* Some code in this method relies on structure that are not thread
safe. We need to make sure
* these data structures are modified before releasing the semaphore.
*/
- public void onSuccess(SearchResponse<ObjectNode> searchResponse) {
+ private void onSuccess(@NotNull SearchResponse<ObjectNode>
searchResponse) {
long searchTotalTime = System.currentTimeMillis() -
searchStartTime;
-
List<Hit<ObjectNode>> searchHits = searchResponse.hits().hits();
int hitsSize = searchHits != null ? searchHits.size() : 0;
metricHandler.measureQuery(indexNode.getDefinition().getIndexPath(), hitsSize,
searchResponse.took(),
searchTotalTime, searchResponse.timedOut());
if (hitsSize > 0) {
long totalHits = searchResponse.hits().total().value();
- LOG.debug("Processing search response that took {} to read
{}/{} docs", searchResponse.took(), hitsSize, totalHits);
+ LOG.debug("Processing search response that took {} ms to read
{}/{} docs", searchResponse.took(), hitsSize, totalHits);
lastHitSortValues = searchHits.get(hitsSize - 1).sort();
scannedRows += hitsSize;
if (searchResponse.hits().total().relation() ==
TotalHitsRelation.Eq) {
@@ -374,12 +421,12 @@ public class ElasticResultRowAsyncIterator implements
ElasticQueryIterator, Elas
}
}
- public void onFailure(Throwable t) {
+ private void onFailure(@NotNull Throwable t) {
metricHandler.measureFailedQuery(indexNode.getDefinition().getIndexPath(),
System.currentTimeMillis() - searchStartTime);
// Check in case errorRef is already set - this seems unlikely
since we close the scanner once we hit failure.
// But still, in case this do happen, we will log a warning.
- Throwable error = errorRef.getAndSet(t);
+ Throwable error = queryErrorRef.getAndSet(t);
if (error != null) {
LOG.warn("Error reference for async iterator was previously
set to {}. It has now been reset to new error {}", error.getMessage(),
t.getMessage());
}
@@ -399,6 +446,10 @@ public class ElasticResultRowAsyncIterator implements
ElasticQueryIterator, Elas
* Triggers a scan of a new chunk of the result set, if needed.
*/
private void scan() {
+ if (isClosed.get()) {
+ LOG.debug("Scanner is closed, ignoring scan request");
+ return;
+ }
if (semaphore.tryAcquire() && anyDataLeft.get()) {
final SearchRequest searchReq = SearchRequest.of(s -> s
.index(indexNode.getDefinition().getIndexAlias())
@@ -415,19 +466,45 @@ public class ElasticResultRowAsyncIterator implements
ElasticQueryIterator, Elas
LOG.trace("Kicking new search after query {}", searchReq);
searchStartTime = System.currentTimeMillis();
- indexNode.getConnection().getAsyncClient()
+ ongoingRequest = indexNode.getConnection().getAsyncClient()
.search(searchReq, ObjectNode.class)
- .whenComplete(((searchResponse, throwable) -> {
- if (throwable != null) {
- onFailure(throwable);
- } else onSuccess(searchResponse);
- }));
+ .whenComplete(this::handleResponse);
metricHandler.markQuery(indexNode.getDefinition().getIndexPath(), false);
} else {
LOG.trace("Scanner is closing or still processing data from
the previous scan");
}
}
+ private void handleResponse(SearchResponse<ObjectNode> searchResponse,
Throwable throwable) {
+ ongoingRequest = null;
+ if (isClosed.get()) {
+ LOG.info("Scanner is closed, not processing search response");
+ return;
+ }
+ try {
+ if (throwable == null) {
+ onSuccess(searchResponse);
+ } else {
+ onFailure(throwable);
+ }
+ } catch (Throwable t) {
+ LOG.warn("Error processing search response", t);
+ Throwable prevValue = systemErrorRef.getAndSet(t);
+ if (prevValue != null) {
+ LOG.warn("System error reference was previously set to {}.
It has now been reset to new error {}", prevValue.getMessage(), t.getMessage());
+ }
+ try {
+ if (!queue.offer(POISON_PILL, enqueueTimeoutMs,
TimeUnit.MILLISECONDS)) {
+ LOG.warn("Timeout waiting to enqueue poison pill after
error processing search response. The iterator might not be closed properly.");
+ }
+ } catch (InterruptedException e) {
+ Thread.currentThread().interrupt(); // restore interrupt
status
+ LOG.warn("Interrupted while trying to enqueue poison pill
after error processing search response", e);
+ }
+ // This method should not throw exceptions, see the
whenComplete() contract.
+ }
+ }
+
/* picks the size in the fetch array at index=requests or the last if
out of bound */
private int getFetchSize(int requestId) {
int[] queryFetchSizes = indexNode.getDefinition().queryFetchSizes;
@@ -435,11 +512,26 @@ public class ElasticResultRowAsyncIterator implements
ElasticQueryIterator, Elas
queryFetchSizes[requestId] :
queryFetchSizes[queryFetchSizes.length - 1];
}
- // close all listeners
private void close() {
- semaphore.release();
- for (ElasticResponseListener l : allListeners) {
- l.endData();
+ if (isClosed.compareAndSet(false, true)) {
+ LOG.debug("Closing ElasticQueryScanner for index {}",
indexNode.getDefinition().getIndexPath());
+ // Close listeners and release the semaphore
+ semaphore.release();
+ for (ElasticResponseListener l : allListeners) {
+ try {
+ l.endData();
+ } catch (Exception ex) {
+ LOG.warn("Error while closing listener {}",
l.getClass().getName(), ex);
+ }
+ }
+ allListeners.clear();
+
+ if (ongoingRequest != null) {
+ ongoingRequest.cancel(true);
+ ongoingRequest = null;
+ }
+ } else {
+ LOG.info("ElasticQueryScanner for index {} is already closed",
indexNode.getDefinition().getIndexPath());
}
}
}
diff --git
a/oak-search-elastic/src/test/java/org/apache/jackrabbit/oak/plugins/index/elastic/ElasticAbstractQueryTest.java
b/oak-search-elastic/src/test/java/org/apache/jackrabbit/oak/plugins/index/elastic/ElasticAbstractQueryTest.java
index ff1ec47812..d5fc07271e 100644
---
a/oak-search-elastic/src/test/java/org/apache/jackrabbit/oak/plugins/index/elastic/ElasticAbstractQueryTest.java
+++
b/oak-search-elastic/src/test/java/org/apache/jackrabbit/oak/plugins/index/elastic/ElasticAbstractQueryTest.java
@@ -157,12 +157,13 @@ public abstract class ElasticAbstractQueryTest extends
AbstractQueryTest {
nodeStore = getNodeStore();
QueryEngineSettings queryEngineSettings = new QueryEngineSettings();
queryEngineSettings.setInferenceEnabled(isInferenceEnabled());
+ queryEngineSettings.setLimitReads(limitReads());
InferenceConfig.reInitialize(nodeStore, INFERENCE_CONFIG_PATH,
isInferenceEnabled());
indexTracker = new ElasticIndexTracker(esConnection,
getMetricHandler());
ElasticIndexEditorProvider editorProvider = new
ElasticIndexEditorProvider(indexTracker, esConnection,
new ExtractedTextCache(10 * FileUtils.ONE_MB, 100),
getElasticRetryPolicy());
- ElasticIndexProvider indexProvider = new
ElasticIndexProvider(indexTracker);
+ ElasticIndexProvider indexProvider = new
ElasticIndexProvider(indexTracker, getAsyncIteratorEnqueueTimeoutMs());
asyncIndexUpdate = getAsyncIndexUpdate("async", nodeStore,
CompositeIndexEditorProvider.compose(
@@ -189,6 +190,14 @@ public abstract class ElasticAbstractQueryTest extends
AbstractQueryTest {
return oak.createContentRepository();
}
+ protected long getAsyncIteratorEnqueueTimeoutMs() {
+ return ElasticIndexProvider.DEFAULT_ASYNC_ITERATOR_ENQUEUE_TIMEOUT_MS;
+ }
+
+ protected long limitReads() {
+ return QueryEngineSettings.DEFAULT_QUERY_LIMIT_READS;
+ }
+
protected boolean isInferenceEnabled() {
return true;
}
diff --git
a/oak-search-elastic/src/test/java/org/apache/jackrabbit/oak/plugins/index/elastic/ElasticIndexProviderServiceTest.java
b/oak-search-elastic/src/test/java/org/apache/jackrabbit/oak/plugins/index/elastic/ElasticIndexProviderServiceTest.java
index 3cd213e601..0a8cc33e5c 100644
---
a/oak-search-elastic/src/test/java/org/apache/jackrabbit/oak/plugins/index/elastic/ElasticIndexProviderServiceTest.java
+++
b/oak-search-elastic/src/test/java/org/apache/jackrabbit/oak/plugins/index/elastic/ElasticIndexProviderServiceTest.java
@@ -20,6 +20,7 @@ import org.apache.jackrabbit.oak.osgi.OsgiWhiteboard;
import org.apache.jackrabbit.oak.plugins.index.AsyncIndexInfoService;
import org.apache.jackrabbit.oak.plugins.index.IndexEditorProvider;
import
org.apache.jackrabbit.oak.plugins.index.elastic.index.ElasticIndexEditorProvider;
+import
org.apache.jackrabbit.oak.plugins.index.elastic.query.ElasticIndexProvider;
import
org.apache.jackrabbit.oak.plugins.index.elastic.query.inference.InferenceConfig;
import org.apache.jackrabbit.oak.plugins.memory.MemoryNodeStore;
import org.apache.jackrabbit.oak.query.QueryEngineSettings;
@@ -33,6 +34,7 @@ import org.apache.jackrabbit.oak.stats.MeterStats;
import org.apache.jackrabbit.oak.stats.StatisticsProvider;
import org.apache.sling.testing.mock.osgi.MockOsgi;
import org.apache.sling.testing.mock.osgi.junit.OsgiContext;
+import org.jetbrains.annotations.Nullable;
import org.junit.Before;
import org.junit.ClassRule;
import org.junit.Rule;
@@ -48,6 +50,7 @@ import java.util.concurrent.TimeUnit;
import static
org.apache.jackrabbit.oak.plugins.index.elastic.ElasticIndexProviderService.PROP_DISABLED;
import static
org.apache.jackrabbit.oak.plugins.index.elastic.ElasticIndexProviderService.PROP_ELASTIC_API_KEY_ID;
import static
org.apache.jackrabbit.oak.plugins.index.elastic.ElasticIndexProviderService.PROP_ELASTIC_API_KEY_SECRET;
+import static
org.apache.jackrabbit.oak.plugins.index.elastic.ElasticIndexProviderService.PROP_ELASTIC_ASYNC_ITERATOR_ENQUEUE_TIMEOUT_MS;
import static
org.apache.jackrabbit.oak.plugins.index.elastic.ElasticIndexProviderService.PROP_ELASTIC_HOST;
import static
org.apache.jackrabbit.oak.plugins.index.elastic.ElasticIndexProviderService.PROP_ELASTIC_MAX_RETRY_TIME;
import static
org.apache.jackrabbit.oak.plugins.index.elastic.ElasticIndexProviderService.PROP_ELASTIC_PORT;
@@ -190,14 +193,28 @@ public class ElasticIndexProviderServiceTest {
MockOsgi.activate(service, context.bundleContext(), props);
assertNotNull(context.getService(QueryIndexProvider.class));
- assertNotNull(context.getService(IndexEditorProvider.class));
-
- ElasticIndexEditorProvider editorProvider =
(ElasticIndexEditorProvider) context.getService(IndexEditorProvider.class);
+ IndexEditorProvider indexEditorProvider =
context.getService(IndexEditorProvider.class);
+ assertNotNull(indexEditorProvider);
+ ElasticIndexEditorProvider editorProvider =
(ElasticIndexEditorProvider) indexEditorProvider;
assertEquals(TimeUnit.SECONDS.toMillis(600),
editorProvider.getRetryPolicy().getMaxRetryTimeMs());
MockOsgi.deactivate(service, context.bundleContext());
}
+ @Test
+ public void withAsyncIteratorEnqueueTimeoutMs() {
+ Map<String, Object> props = new HashMap<>(getElasticConfig());
+ props.put(PROP_ELASTIC_ASYNC_ITERATOR_ENQUEUE_TIMEOUT_MS, 123);
+ MockOsgi.activate(service, context.bundleContext(), props);
+
+ QueryIndexProvider queryIndexProvider =
context.getService(QueryIndexProvider.class);
+ assertNotNull(queryIndexProvider);
+ ElasticIndexProvider elasticIndexProvider = (ElasticIndexProvider)
queryIndexProvider;
+ assertEquals(123,
elasticIndexProvider.getAsyncIteratorEnqueueTimeoutMs());
+
+ MockOsgi.deactivate(service, context.bundleContext());
+ }
+
private HashMap<String, Object> getElasticConfig() {
HashMap<String, Object> config = new HashMap<>();
diff --git
a/oak-search-elastic/src/test/java/org/apache/jackrabbit/oak/plugins/index/elastic/ElasticReliabilitySlowReaderQueryTest.java
b/oak-search-elastic/src/test/java/org/apache/jackrabbit/oak/plugins/index/elastic/ElasticReliabilitySlowReaderQueryTest.java
new file mode 100644
index 0000000000..ba4372d2ee
--- /dev/null
+++
b/oak-search-elastic/src/test/java/org/apache/jackrabbit/oak/plugins/index/elastic/ElasticReliabilitySlowReaderQueryTest.java
@@ -0,0 +1,110 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.jackrabbit.oak.plugins.index.elastic;
+
+import org.apache.jackrabbit.oak.api.CommitFailedException;
+import org.apache.jackrabbit.oak.api.Result;
+import org.apache.jackrabbit.oak.api.ResultRow;
+import org.apache.jackrabbit.oak.api.Tree;
+import org.apache.jackrabbit.oak.api.Type;
+import
org.apache.jackrabbit.oak.plugins.index.search.util.IndexDefinitionBuilder;
+import org.apache.jackrabbit.oak.spi.query.QueryConstants;
+import org.junit.Test;
+
+import java.util.ArrayList;
+import java.util.Iterator;
+import java.util.List;
+import java.util.UUID;
+
+import static org.apache.jackrabbit.oak.api.QueryEngine.NO_BINDINGS;
+import static org.hamcrest.CoreMatchers.containsString;
+import static org.hamcrest.MatcherAssert.assertThat;
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.assertTrue;
+import static org.junit.Assert.fail;
+
+public class ElasticReliabilitySlowReaderQueryTest extends
ElasticAbstractQueryTest {
+
+ private static final String QUERY_PROP_A = "select [jcr:path] from
[nt:base] where propa is not null";
+
+ @Override
+ protected long getAsyncIteratorEnqueueTimeoutMs() {
+ return 1000;
+ }
+
+ @Override
+ protected long limitReads() {
+ return 2;
+ }
+
+ /**
+ * This tests the case where a reader thread is very slow reading the
results and the ElasticResultRowAsyncIterator
+ * timeouts out enqueuing results in its internal result queue. To trigger
a timeout in the ElasticResultRowAsyncIterator,
+ * we set the timeout for the internal queue to 1s and limit the number of
results that can be stored in the queue to 2.
+ * The following should happen:
+ * - the first read will succeed because the reader thread reads it
immediately.
+ * - The reader thread waits for 2 seconds. During this time, the iterator
reads 2 more results and then blocks trying
+ * to enqueue the 4th result because the queue is full. After 1 second of
waiting, it times out and closes the iterator.
+ * - the reader thread awakes up and tries to continue reading. It reads
the next two results which were successfully
+ * put in the queue before the iterator timed out, even though the
iterator is already closed.
+ * - Then it fails to read the 4th result and receives an exception
indicating that the iterator has timed out.
+ */
+ @Test
+ public void slowReader() throws Exception {
+ String indexName = UUID.randomUUID().toString();
+ IndexDefinitionBuilder builder = createIndex("propa");
+ setIndex(indexName, builder);
+ root.commit();
+
+ // Populate the index
+ addNodes(6);
+
+ // simulate a slow reader. Reads the first result, waits for 2 seconds,
+ Result result = executeQuery(QUERY_PROP_A, SQL2, NO_BINDINGS);
+ ArrayList<String> resultPaths = new ArrayList<>();
+ Iterator<? extends ResultRow> resultRows = result.getRows().iterator();
+ // Read the first result immediately
+ assertTrue(resultRows.hasNext());
+
resultPaths.add(resultRows.next().getValue(QueryConstants.JCR_PATH).getValue(Type.STRING));
+ Thread.sleep(2000L); // Simulate slow reading
+ // The iterator should have timed out trying to enqueue the next
result. The next two results should still be
+ // available because they were enqueued before the queue got full and
the iterator timed out.
+ assertTrue(resultRows.hasNext());
+
resultPaths.add(resultRows.next().getValue(QueryConstants.JCR_PATH).getValue(Type.STRING));
+ assertTrue(resultRows.hasNext());
+
resultPaths.add(resultRows.next().getValue(QueryConstants.JCR_PATH).getValue(Type.STRING));
+ // The next read should fail
+ try {
+ assertFalse(resultRows.hasNext());
+ fail("Expected an exception while reading results");
+ } catch (IllegalStateException e) {
+ assertThat(e.getMessage(), containsString("Error while fetching
results from Elastic"));
+ }
+ assertEquals(List.of("/test/a0", "/test/a1", "/test/a2"), resultPaths);
+ }
+
+ private void addNodes(int count) throws CommitFailedException {
+ Tree test = root.getTree("/").addChild("test");
+ for (int i = 0; i < count; i++) {
+ test.addChild("a" + i).setProperty("propa", "a" + i);
+ }
+ root.commit();
+ this.asyncIndexUpdate.run();
+ assertEventually(() -> assertQuery(QUERY_PROP_A, List.of("/test/a0",
"/test/a1", "/test/a2", "/test/a3", "/test/a4", "/test/a5")));
+ }
+}
diff --git
a/oak-search-elastic/src/test/java/org/apache/jackrabbit/oak/plugins/index/elastic/ElasticReliabilitySyncIndexingTest.java
b/oak-search-elastic/src/test/java/org/apache/jackrabbit/oak/plugins/index/elastic/ElasticReliabilitySyncIndexingTest.java
index 0c3d4d1a1f..63dfeee171 100644
---
a/oak-search-elastic/src/test/java/org/apache/jackrabbit/oak/plugins/index/elastic/ElasticReliabilitySyncIndexingTest.java
+++
b/oak-search-elastic/src/test/java/org/apache/jackrabbit/oak/plugins/index/elastic/ElasticReliabilitySyncIndexingTest.java
@@ -17,9 +17,11 @@
package org.apache.jackrabbit.oak.plugins.index.elastic;
import eu.rekawek.toxiproxy.model.ToxicDirection;
+import eu.rekawek.toxiproxy.model.toxic.Latency;
import eu.rekawek.toxiproxy.model.toxic.LimitData;
import org.apache.jackrabbit.oak.api.CommitFailedException;
import org.apache.jackrabbit.oak.api.Tree;
+import
org.apache.jackrabbit.oak.plugins.index.search.util.IndexDefinitionBuilder;
import org.junit.Test;
import java.util.List;
@@ -29,6 +31,7 @@ import java.util.UUID;
import static org.hamcrest.CoreMatchers.containsString;
import static org.hamcrest.CoreMatchers.not;
import static org.hamcrest.MatcherAssert.assertThat;
+import static org.junit.Assert.fail;
public class ElasticReliabilitySyncIndexingTest extends ElasticReliabilityTest
{
@@ -72,6 +75,44 @@ public class ElasticReliabilitySyncIndexingTest extends
ElasticReliabilityTest {
});
}
+ @Test
+ public void elasticQueryTimeout() throws Exception {
+ String query = "select [jcr:path] from [nt:base] where propa is not
null";
+
+ // Simulate a timeout of a query to the Elasticsearch cluster by
setting a very low Oak Query timeout (10 ms)
+ // and a high latency on the connection to Elastic. This way, the
ElasticResultRowAsyncIterator will timeout
+ // waiting for the response from Elastic.
+ // Index with very low query timeout, 100ms
+ IndexDefinitionBuilder builderPropAIndex = createIndex("propa");
+
builderPropAIndex.getBuilderTree().setProperty(ElasticIndexDefinition.QUERY_TIMEOUT_MS,
500L);
+ setIndex(UUID.randomUUID().toString(), builderPropAIndex);
+
+ root.commit();
+
+ Tree test = root.getTree("/").addChild("test");
+ test.addChild("a").setProperty("propa", "a");
+ root.commit();
+
+ // Wait for the index to be updated
+ assertEventually(() -> assertQuery(query, List.of("/test/a")));
+
+ // simulate Elastic taking a long time to respond
+ Latency slowQueryToxic = proxy.toxics()
+ .latency("slow_query", ToxicDirection.DOWNSTREAM, 1000L);
+ try {
+ // This Oak query should timeout after 100ms,
+ executeQuery(query, SQL2);
+ fail("Expected a timeout exception");
+ } catch (IllegalStateException e) {
+ LOG.info("Expected timeout exception.", e);
+ assertThat(e.getMessage(), containsString("Timeout"));
+ }
+ slowQueryToxic.remove();
+
+ // After removing the latency toxic, the query should succeed
+ assertEventually(() -> assertQuery(query, List.of("/test/a")));
+ }
+
@Test
public void connectionCutOnIndex() throws Exception {
String indexName = UUID.randomUUID().toString();