mkapalka commented on code in PR #2410:
URL: https://github.com/apache/jackrabbit-oak/pull/2410#discussion_r2245653285
##########
oak-search-elastic/src/main/java/org/apache/jackrabbit/oak/plugins/index/elastic/query/async/ElasticResultRowAsyncIterator.java:
##########
@@ -415,31 +466,72 @@ private void scan() {
LOG.trace("Kicking new search after query {}", searchReq);
searchStartTime = System.currentTimeMillis();
- indexNode.getConnection().getAsyncClient()
+ ongoingRequest = indexNode.getConnection().getAsyncClient()
.search(searchReq, ObjectNode.class)
- .whenComplete(((searchResponse, throwable) -> {
- if (throwable != null) {
- onFailure(throwable);
- } else onSuccess(searchResponse);
- }));
+ .whenComplete(this::handleResponse);
metricHandler.markQuery(indexNode.getDefinition().getIndexPath(), false);
} else {
LOG.trace("Scanner is closing or still processing data from
the previous scan");
}
}
+ private void handleResponse(SearchResponse<ObjectNode> searchResponse,
Throwable throwable) {
+ ongoingRequest = null;
+ if (isClosed.get()) {
+ LOG.info("Scanner is closed, not processing search response");
+ return;
+ }
+ try {
+ if (throwable == null) {
+ onSuccess(searchResponse);
+ } else {
+ onFailure(throwable);
+ }
+ } catch (Throwable t) {
+ LOG.warn("Error processing search response", t);
+ Throwable prevValue = systemErrorRef.getAndSet(t);
+ if (prevValue != null) {
+ LOG.warn("System error reference was previously set to {}.
It has now been reset to new error {}", prevValue.getMessage(), t.getMessage());
+ }
+ try {
+ if (!queue.offer(POISON_PILL, enqueueTimeoutMs,
TimeUnit.MILLISECONDS)) {
+ LOG.warn("Timeout waiting to enqueue poison pill after
error processing search response. The iterator might not be closed properly.");
+ }
+ } catch (InterruptedException e) {
+ Thread.currentThread().interrupt(); // restore interrupt
status
+ LOG.warn("Interrupted while trying to enqueue poison pill
after error processing search response", e);
+ }
+ throw t;
Review Comment:
Method `handleResponse` is used only as argument to `whenComplete` which
javadoc is saying: "this method is not designed to translate completion
outcomes, so the supplied action should not throw an exception", and also the
resulting stage is not used later to get results (it's only used for cancelling
the task). Would it be safer to close the scanner here instead of throwing `t`
to propagate the error quickly to the iterator?
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]