gortiz commented on code in PR #18458:
URL: https://github.com/apache/pinot/pull/18458#discussion_r3268055039


##########
pinot-query-runtime/src/main/java/org/apache/pinot/query/service/dispatch/QueryDispatcher.java:
##########
@@ -207,19 +388,48 @@ private QueryResult tryRecover(long requestId, 
Set<QueryServerInstance> servers,
     } else if (ex instanceof QueryException) {
       errorCode = ((QueryException) ex).getErrorCode();
     } else {
-      // in case of unknown exceptions, the exception will be rethrown, so we 
don't need stats
       cancel(requestId, servers);
       throw ex;
     }
-    // in case of known exceptions (timeout or query exception), we need can 
build here the erroneous QueryResult
-    // that include the stats.
-    LOGGER.warn("Query failed with a known exception. Trying to cancel the 
other opchains");
-    MultiStageQueryStats stats = cancelWithStats(requestId, servers);
-    if (stats == null) {
+    LOGGER.warn("Query failed with a known exception. Cancelling remaining 
opchains.");
+    cancel(requestId, servers);
+    QueryProcessingException processingException = new 
QueryProcessingException(errorCode, ex.getMessage());
+    return new QueryResult(processingException, 
MultiStageQueryStats.emptyStats(0), 0L);
+  }
+
+  /// Tries to recover from an exception thrown during stream-mode ({@code 
SubmitWithStream}) query dispatching.
+  ///
+  /// Fans out cancel over the open streams, waits briefly for any remaining 
{@code OpChainComplete} messages (up to
+  /// the query deadline), and builds a {@link QueryResult} that includes 
whatever stats arrived before the deadline.
+  /// Stats from before the error are available because servers push {@code 
OpChainComplete} even on failure.
+  ///
+  /// Unknown exceptions (not {@link TimeoutException} or {@link 
QueryException}) are re-thrown after cancel fan-out.
+  private QueryResult tryRecoverWithStream(StreamingQuerySession session, 
Map<Integer, Integer> expectedByStage,
+      long deadlineMs, Exception ex)
+      throws Exception {
+    if (ex instanceof ExecutionException && ex.getCause() instanceof 
Exception) {
+      ex = (Exception) ex.getCause();
+    }
+    QueryErrorCode errorCode;
+    if (ex instanceof TimeoutException) {
+      errorCode = QueryErrorCode.EXECUTION_TIMEOUT;
+    } else if (ex instanceof QueryException) {
+      errorCode = ((QueryException) ex).getErrorCode();
+    } else {
+      session.fanOutCancel();
       throw ex;
     }
+    LOGGER.warn("Stream-mode query failed with a known exception. Fanning out 
cancel and waiting for stats.");
+    session.fanOutCancel();
+    long remainingMs = Math.max(0, deadlineMs - System.currentTimeMillis());
+    try {
+      session.awaitCompletion(remainingMs, TimeUnit.MILLISECONDS);

Review Comment:
   I think this has been fixed.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to