walterddr commented on code in PR #10332:
URL: https://github.com/apache/pinot/pull/10332#discussion_r1119050545
##########
pinot-query-runtime/src/main/java/org/apache/pinot/query/service/QueryDispatcher.java:
##########
@@ -220,23 +252,94 @@ public static MailboxReceiveOperator
createReduceStageOperator(MailboxService<Tr
public void shutdown() {
for (DispatchClient dispatchClient : _dispatchClientMap.values()) {
- dispatchClient._managedChannel.shutdown();
+ dispatchClient._channel.shutdown();
}
_dispatchClientMap.clear();
}
+ @VisibleForTesting
+ DispatchClient getOrCreateDispatchClient(String host, int port) {
+ String key = String.format("%s_%d", host, port);
+ return _dispatchClientMap.computeIfAbsent(key, k -> new
DispatchClient(host, port));
+ }
+
public static class DispatchClient {
- private final PinotQueryWorkerGrpc.PinotQueryWorkerBlockingStub
_blockingStub;
- private final ManagedChannel _managedChannel;
+ private ManagedChannel _channel;
+ private PinotQueryWorkerGrpc.PinotQueryWorkerStub _dispatchStub;
public DispatchClient(String host, int port) {
- ManagedChannelBuilder managedChannelBuilder =
ManagedChannelBuilder.forAddress(host, port).usePlaintext();
- _managedChannel = managedChannelBuilder.build();
- _blockingStub = PinotQueryWorkerGrpc.newBlockingStub(_managedChannel);
+ _channel = ManagedChannelBuilder.forAddress(host,
port).usePlaintext().build();
Review Comment:
nit: let's move the DispatchClient and DispatchObserver to their own file
under `org.apache.pinot.query.service.dispatch`. makes the code cleaner.
##########
pinot-query-runtime/src/main/java/org/apache/pinot/query/service/QueryDispatcher.java:
##########
@@ -99,28 +116,43 @@ public int submit(long requestId, QueryPlan queryPlan,
long timeoutMs, Map<Strin
String host = serverInstance.getHostname();
int servicePort = serverInstance.getQueryServicePort();
DispatchClient client = getOrCreateDispatchClient(host, servicePort);
- Worker.QueryResponse response =
client.submit(Worker.QueryRequest.newBuilder().setStagePlan(
-
QueryPlanSerDeUtils.serialize(constructDistributedStagePlan(queryPlan, stageId,
serverInstance)))
- .putMetadata(QueryConfig.KEY_OF_BROKER_REQUEST_ID,
String.valueOf(requestId))
- .putMetadata(QueryConfig.KEY_OF_BROKER_REQUEST_TIMEOUT_MS,
String.valueOf(timeoutMs))
- .putAllMetadata(queryOptions).build());
-
- if
(response.containsMetadata(QueryConfig.KEY_OF_SERVER_RESPONSE_STATUS_ERROR)) {
- throw new RuntimeException(
- String.format("Unable to execute query plan at stage %s on
server %s: ERROR: %s", stageId,
- serverInstance, response));
+ dispatchCalls++;
+ _executorService.submit(() -> {
+ client.submit(Worker.QueryRequest.newBuilder().setStagePlan(
+
QueryPlanSerDeUtils.serialize(constructDistributedStagePlan(queryPlan, stageId,
serverInstance)))
+ .putMetadata(QueryConfig.KEY_OF_BROKER_REQUEST_ID,
String.valueOf(requestId))
+ .putMetadata(QueryConfig.KEY_OF_BROKER_REQUEST_TIMEOUT_MS,
String.valueOf(timeoutMs))
+ .putAllMetadata(queryOptions).build(), stageId,
serverInstance, deadline, callbacks::offer);
+ });
+ }
+ }
+ }
+ for (int i = 0; i < dispatchCalls; i++) {
+ AsyncResponse resp;
+ while (!deadline.isExpired()) {
+ resp = callbacks.poll(DEFAULT_DISPATCHER_CALLBACK_POLL_TIMEOUT_MS,
TimeUnit.MILLISECONDS);
+ if (resp != null) {
+ if (resp.getThrowable() != null) {
+ throw new RuntimeException(String.format("Error dispatching query
to server=%s stage=%s",
+ resp._virtualServer, resp._stageId), resp.getThrowable());
+ } else {
+ Worker.QueryResponse response = resp.getQueryResponse();
+ if
(response.containsMetadata(QueryConfig.KEY_OF_SERVER_RESPONSE_STATUS_ERROR)) {
+ throw new RuntimeException(
+ String.format("Unable to execute query plan at stage %s on
server %s: ERROR: %s", resp.getStageId(),
+ resp.getVirtualServer(), response));
+ }
+ break;
Review Comment:
not sure why we need a separate deadline expire + 2 nested loop.
cant we just do
```
while (!deadline.isExpired() && successfulDispatchCount < dispatchCalls) {
resp = callbacks.poll(TIMEOUT);
if (...) {
.... // all the error conditions.
} else {
successfulDispatch ++;
}
}
```
##########
pinot-query-runtime/src/main/java/org/apache/pinot/query/service/QueryDispatcher.java:
##########
@@ -99,28 +116,43 @@ public int submit(long requestId, QueryPlan queryPlan,
long timeoutMs, Map<Strin
String host = serverInstance.getHostname();
int servicePort = serverInstance.getQueryServicePort();
DispatchClient client = getOrCreateDispatchClient(host, servicePort);
- Worker.QueryResponse response =
client.submit(Worker.QueryRequest.newBuilder().setStagePlan(
-
QueryPlanSerDeUtils.serialize(constructDistributedStagePlan(queryPlan, stageId,
serverInstance)))
- .putMetadata(QueryConfig.KEY_OF_BROKER_REQUEST_ID,
String.valueOf(requestId))
- .putMetadata(QueryConfig.KEY_OF_BROKER_REQUEST_TIMEOUT_MS,
String.valueOf(timeoutMs))
- .putAllMetadata(queryOptions).build());
-
- if
(response.containsMetadata(QueryConfig.KEY_OF_SERVER_RESPONSE_STATUS_ERROR)) {
- throw new RuntimeException(
- String.format("Unable to execute query plan at stage %s on
server %s: ERROR: %s", stageId,
- serverInstance, response));
+ dispatchCalls++;
+ _executorService.submit(() -> {
+ client.submit(Worker.QueryRequest.newBuilder().setStagePlan(
+
QueryPlanSerDeUtils.serialize(constructDistributedStagePlan(queryPlan, stageId,
serverInstance)))
+ .putMetadata(QueryConfig.KEY_OF_BROKER_REQUEST_ID,
String.valueOf(requestId))
+ .putMetadata(QueryConfig.KEY_OF_BROKER_REQUEST_TIMEOUT_MS,
String.valueOf(timeoutMs))
+ .putAllMetadata(queryOptions).build(), stageId,
serverInstance, deadline, callbacks::offer);
+ });
+ }
+ }
+ }
+ for (int i = 0; i < dispatchCalls; i++) {
+ AsyncResponse resp;
+ while (!deadline.isExpired()) {
+ resp = callbacks.poll(DEFAULT_DISPATCHER_CALLBACK_POLL_TIMEOUT_MS,
TimeUnit.MILLISECONDS);
+ if (resp != null) {
+ if (resp.getThrowable() != null) {
+ throw new RuntimeException(String.format("Error dispatching query
to server=%s stage=%s",
+ resp._virtualServer, resp._stageId), resp.getThrowable());
+ } else {
+ Worker.QueryResponse response = resp.getQueryResponse();
+ if
(response.containsMetadata(QueryConfig.KEY_OF_SERVER_RESPONSE_STATUS_ERROR)) {
+ throw new RuntimeException(
+ String.format("Unable to execute query plan at stage %s on
server %s: ERROR: %s", resp.getStageId(),
+ resp.getVirtualServer(), response));
+ }
+ break;
}
}
}
}
+ if (deadline.isExpired()) {
+ throw new RuntimeException("Timed out waiting for response of async
query-dispatch");
+ }
Review Comment:
add a comment: `TODO: cancel all ongoing dispatched requests`
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]