gortiz commented on code in PR #18458:
URL: https://github.com/apache/pinot/pull/18458#discussion_r3268055039
##########
pinot-query-runtime/src/main/java/org/apache/pinot/query/service/dispatch/QueryDispatcher.java:
##########
@@ -207,19 +388,48 @@ private QueryResult tryRecover(long requestId,
Set<QueryServerInstance> servers,
} else if (ex instanceof QueryException) {
errorCode = ((QueryException) ex).getErrorCode();
} else {
- // in case of unknown exceptions, the exception will be rethrown, so we
don't need stats
cancel(requestId, servers);
throw ex;
}
- // in case of known exceptions (timeout or query exception), we need can
build here the erroneous QueryResult
- // that include the stats.
- LOGGER.warn("Query failed with a known exception. Trying to cancel the
other opchains");
- MultiStageQueryStats stats = cancelWithStats(requestId, servers);
- if (stats == null) {
+ LOGGER.warn("Query failed with a known exception. Cancelling remaining
opchains.");
+ cancel(requestId, servers);
+ QueryProcessingException processingException = new
QueryProcessingException(errorCode, ex.getMessage());
+ return new QueryResult(processingException,
MultiStageQueryStats.emptyStats(0), 0L);
+ }
+
+ /// Tries to recover from an exception thrown during stream-mode ({@code
SubmitWithStream}) query dispatching.
+ ///
+ /// Fans out cancel over the open streams, waits briefly for any remaining
{@code OpChainComplete} messages (up to
+ /// the query deadline), and builds a {@link QueryResult} that includes
whatever stats arrived before the deadline.
+ /// Stats from before the error are available because servers push {@code
OpChainComplete} even on failure.
+ ///
+ /// Unknown exceptions (not {@link TimeoutException} or {@link
QueryException}) are re-thrown after cancel fan-out.
+ private QueryResult tryRecoverWithStream(StreamingQuerySession session,
Map<Integer, Integer> expectedByStage,
+ long deadlineMs, Exception ex)
+ throws Exception {
+ if (ex instanceof ExecutionException && ex.getCause() instanceof
Exception) {
+ ex = (Exception) ex.getCause();
+ }
+ QueryErrorCode errorCode;
+ if (ex instanceof TimeoutException) {
+ errorCode = QueryErrorCode.EXECUTION_TIMEOUT;
+ } else if (ex instanceof QueryException) {
+ errorCode = ((QueryException) ex).getErrorCode();
+ } else {
+ session.fanOutCancel();
throw ex;
}
+ LOGGER.warn("Stream-mode query failed with a known exception. Fanning out
cancel and waiting for stats.");
+ session.fanOutCancel();
+ long remainingMs = Math.max(0, deadlineMs - System.currentTimeMillis());
+ try {
+ session.awaitCompletion(remainingMs, TimeUnit.MILLISECONDS);
Review Comment:
I think this has been fixed.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]