agavra commented on code in PR #10117:
URL: https://github.com/apache/pinot/pull/10117#discussion_r1068731323


##########
pinot-query-runtime/src/main/java/org/apache/pinot/query/service/QueryDispatcher.java:
##########
@@ -86,19 +90,29 @@ public int submit(long requestId, QueryPlan queryPlan, long 
timeoutMs)
           String host = serverInstance.getHostname();
           int servicePort = serverInstance.getQueryServicePort();
           DispatchClient client = getOrCreateDispatchClient(host, servicePort);
-          Worker.QueryResponse response = 
client.submit(Worker.QueryRequest.newBuilder().setStagePlan(
+          Future<Worker.QueryResponse> response = 
client.submit(Worker.QueryRequest.newBuilder().setStagePlan(
                   
QueryPlanSerDeUtils.serialize(constructDistributedStagePlan(queryPlan, stageId, 
serverInstance)))
               .putMetadata(QueryConfig.KEY_OF_BROKER_REQUEST_ID, 
String.valueOf(requestId))
               .putMetadata(QueryConfig.KEY_OF_BROKER_REQUEST_TIMEOUT_MS, 
String.valueOf(timeoutMs)).build());
-          if 
(response.containsMetadata(QueryConfig.KEY_OF_SERVER_RESPONSE_STATUS_ERROR)) {
-            throw new RuntimeException(
-                String.format("Unable to execute query plan at stage %s on 
server %s: ERROR: %s", stageId,
-                    serverInstance, response));
-          }
+          querySubmitResponse.add(response);
+        }
+      }
+    }
+    while (System.currentTimeMillis() < submissionDeadlineMs) {
+      for (Future<Worker.QueryResponse> future : querySubmitResponse) {
+        if (!future.isDone()) {
+          break;
+          Thread.sleep(100);
+        }
+      }
+      for (Future<Worker.QueryResponse> future : querySubmitResponse) {
+        if 
(future.get().containsMetadata(QueryConfig.KEY_OF_SERVER_RESPONSE_STATUS_ERROR))
 {
+          throw new RuntimeException(
+              String.format("Unable to execute query plan!");

Review Comment:
   should we cancel the other futures?



##########
pinot-query-runtime/src/main/java/org/apache/pinot/query/service/QueryDispatcher.java:
##########
@@ -75,6 +77,8 @@ public ResultTable submitAndReduce(long requestId, QueryPlan 
queryPlan,
   public int submit(long requestId, QueryPlan queryPlan, long timeoutMs)
       throws Exception {
     int reduceStageId = -1;
+    long submissionDeadlineMs = System.currentTimeMillis() + 
DEFAULT_SUBMISSION_DEADLINE_MS;

Review Comment:
   this timeout is only until the servers receive the query, right? it has 
nothing to do with executing the query? if so, 1s seems like a pretty large 
default (but probably OK)



##########
pinot-query-runtime/src/main/java/org/apache/pinot/query/service/QueryServer.java:
##########
@@ -45,13 +48,16 @@ public class QueryServer extends 
PinotQueryWorkerGrpc.PinotQueryWorkerImplBase {
 
   private final Server _server;
   private final QueryRunner _queryRunner;
+  private final ExecutorService _executorService;
 
   public QueryServer(int port, QueryRunner queryRunner) {
     _server = ServerBuilder.forPort(port).addService(this).build();
     _queryRunner = queryRunner;
+    _executorService = 
Executors.newFixedThreadPool(ResourceManager.DEFAULT_QUERY_RUNNER_THREADS,

Review Comment:
   can we avoid creating another pool and instead reuse the pools that already 
exist? maybe we should pass this one into the query runner and then into the 
OpChainSchedulerService, or the opposite



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to