walterddr commented on code in PR #10117:
URL: https://github.com/apache/pinot/pull/10117#discussion_r1068807148
##########
pinot-query-runtime/src/main/java/org/apache/pinot/query/service/QueryDispatcher.java:
##########
@@ -75,6 +77,8 @@ public ResultTable submitAndReduce(long requestId, QueryPlan
queryPlan,
public int submit(long requestId, QueryPlan queryPlan, long timeoutMs)
throws Exception {
int reduceStageId = -1;
+ long submissionDeadlineMs = System.currentTimeMillis() +
DEFAULT_SUBMISSION_DEADLINE_MS;
Review Comment:
yeah i am not so convince this is the right way to do async dispatch. i am
open to all suggestions. please definitely share your feedback
1. use FutureStub
2. use obsererStub
3. use blockingStub with timeout
##########
pinot-query-runtime/src/main/java/org/apache/pinot/query/service/QueryDispatcher.java:
##########
@@ -86,19 +90,29 @@ public int submit(long requestId, QueryPlan queryPlan, long
timeoutMs)
String host = serverInstance.getHostname();
int servicePort = serverInstance.getQueryServicePort();
DispatchClient client = getOrCreateDispatchClient(host, servicePort);
- Worker.QueryResponse response =
client.submit(Worker.QueryRequest.newBuilder().setStagePlan(
+ Future<Worker.QueryResponse> response =
client.submit(Worker.QueryRequest.newBuilder().setStagePlan(
QueryPlanSerDeUtils.serialize(constructDistributedStagePlan(queryPlan, stageId,
serverInstance)))
.putMetadata(QueryConfig.KEY_OF_BROKER_REQUEST_ID,
String.valueOf(requestId))
.putMetadata(QueryConfig.KEY_OF_BROKER_REQUEST_TIMEOUT_MS,
String.valueOf(timeoutMs)).build());
- if
(response.containsMetadata(QueryConfig.KEY_OF_SERVER_RESPONSE_STATUS_ERROR)) {
- throw new RuntimeException(
- String.format("Unable to execute query plan at stage %s on
server %s: ERROR: %s", stageId,
- serverInstance, response));
- }
+ querySubmitResponse.add(response);
+ }
+ }
+ }
+ while (System.currentTimeMillis() < submissionDeadlineMs) {
+ for (Future<Worker.QueryResponse> future : querySubmitResponse) {
+ if (!future.isDone()) {
+ break;
+ Thread.sleep(100);
+ }
+ }
+ for (Future<Worker.QueryResponse> future : querySubmitResponse) {
+ if
(future.get().containsMetadata(QueryConfig.KEY_OF_SERVER_RESPONSE_STATUS_ERROR))
{
Review Comment:
no it doesn't
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]