Jackie-Jiang commented on code in PR #9171:
URL: https://github.com/apache/pinot/pull/9171#discussion_r946243934
##########
pinot-broker/src/main/java/org/apache/pinot/broker/requesthandler/BrokerRequestHandlerDelegate.java:
##########
@@ -85,4 +87,22 @@ public BrokerResponse handleRequest(JsonNode request,
@Nullable RequesterIdentit
}
return _singleStageBrokerRequestHandler.handleRequest(request,
requesterIdentity, requestContext);
}
+
+ @Override
+ public Map<Long, String> getRunningQueries() {
+ if (_multiStageWorkerRequestHandler != null) {
+ return _multiStageWorkerRequestHandler.getRunningQueries();
+ }
+ return _singleStageBrokerRequestHandler.getRunningQueries();
+ }
+
+ @Override
+ public boolean cancelQuery(long queryId, int timeoutMs, Executor executor,
HttpConnectionManager connMgr,
+ Map<String, Integer> serverResponses)
+ throws Exception {
+ if (_multiStageWorkerRequestHandler != null) {
+ return _multiStageWorkerRequestHandler.cancelQuery(queryId, timeoutMs,
executor, connMgr, serverResponses);
Review Comment:
Same here. In order to support cancelling the correct query, we either need
to track the engine used, or try to cancel in both engines
##########
pinot-common/src/main/java/org/apache/pinot/common/utils/config/InstanceUtils.java:
##########
@@ -59,6 +60,32 @@ public static String getHelixInstanceId(Instance instance) {
return prefix + instance.getHost() + "_" + instance.getPort();
}
+ public static String getInstanceAdminEndpoint(InstanceConfig instanceConfig,
int defaultPort) {
Review Comment:
Rename it to `getServerAdminEndpoint`. The same logic doesn't apply to other
instance types
##########
pinot-server/src/main/java/org/apache/pinot/server/api/resources/QueryResource.java:
##########
@@ -0,0 +1,88 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pinot.server.api.resources;
+
+import io.swagger.annotations.Api;
+import io.swagger.annotations.ApiKeyAuthDefinition;
+import io.swagger.annotations.ApiOperation;
+import io.swagger.annotations.ApiParam;
+import io.swagger.annotations.ApiResponse;
+import io.swagger.annotations.ApiResponses;
+import io.swagger.annotations.Authorization;
+import io.swagger.annotations.SecurityDefinition;
+import io.swagger.annotations.SwaggerDefinition;
+import java.util.Set;
+import javax.inject.Inject;
+import javax.ws.rs.DELETE;
+import javax.ws.rs.GET;
+import javax.ws.rs.Path;
+import javax.ws.rs.PathParam;
+import javax.ws.rs.Produces;
+import javax.ws.rs.WebApplicationException;
+import javax.ws.rs.core.HttpHeaders;
+import javax.ws.rs.core.MediaType;
+import javax.ws.rs.core.Response;
+import org.apache.pinot.server.starter.ServerInstance;
+
+import static
org.apache.pinot.spi.utils.CommonConstants.SWAGGER_AUTHORIZATION_KEY;
+
+
+/**
+ * API to cancel query running on the server, given a queryId.
+ */
+@Api(tags = "Query", authorizations = {@Authorization(value =
SWAGGER_AUTHORIZATION_KEY)})
+@SwaggerDefinition(securityDefinition =
@SecurityDefinition(apiKeyAuthDefinitions = @ApiKeyAuthDefinition(name =
+ HttpHeaders.AUTHORIZATION, in =
ApiKeyAuthDefinition.ApiKeyLocation.HEADER, key = SWAGGER_AUTHORIZATION_KEY)))
+@Path("/")
+public class QueryResource {
+ @Inject
+ private ServerInstance _serverInstance;
+
+ @DELETE
+ @Path("/query/{queryId}")
+ @Produces(MediaType.APPLICATION_JSON)
+ @ApiOperation(value = "Cancel a query running on the server as identified by
the queryId", notes = "No effect if "
+ + "no query exists for the given queryId. Query may continue to run for
a short while after calling cancel as "
+ + "it's done in a non-blocking manner. The cancel API can be called
multiple times.")
+ @ApiResponses(value = {
+ @ApiResponse(code = 200, message = "Success"), @ApiResponse(code = 500,
message = "Internal server error"),
+ @ApiResponse(code = 404, message = "Query not found running on the
server")
+ })
+ public String cancelQuery(
+ @ApiParam(value = "QueryId as in the format of <brokerId>_<requestId>",
required = true) @PathParam("queryId")
+ String queryId) {
+ if (_serverInstance.getQueryScheduler().cancelQuery(queryId)) {
+ return "Cancelled query: " + queryId;
+ }
+ throw new WebApplicationException(
+ Response.status(Response.Status.NOT_FOUND).entity("Query: " + queryId
+ " not found on the server").build());
+ }
+
+ @GET
+ @Path("/query/id")
+ @Produces(MediaType.APPLICATION_JSON)
+ @ApiOperation(value = "Get queryIds of running queries on the server", notes
= "QueryIds are in the format of "
+ + "<brokerId>_<requestId>")
+ @ApiResponses(value = {
+ @ApiResponse(code = 200, message = "Success"), @ApiResponse(code = 500,
message = "Internal server error")
+ })
+ public Set<String> getRunningQueries() {
Review Comment:
(minor) `getRunningQueryIds()` to be more clear
##########
pinot-common/src/main/java/org/apache/pinot/common/utils/config/InstanceUtils.java:
##########
@@ -59,6 +60,32 @@ public static String getHelixInstanceId(Instance instance) {
return prefix + instance.getHost() + "_" + instance.getPort();
}
+ public static String getInstanceAdminEndpoint(InstanceConfig instanceConfig,
int defaultPort) {
Review Comment:
(minor) We may simplify it to not take the default port since it is already
defined as a constant
##########
pinot-broker/src/main/java/org/apache/pinot/broker/requesthandler/BrokerRequestHandlerDelegate.java:
##########
@@ -85,4 +87,22 @@ public BrokerResponse handleRequest(JsonNode request,
@Nullable RequesterIdentit
}
return _singleStageBrokerRequestHandler.handleRequest(request,
requesterIdentity, requestContext);
}
+
+ @Override
+ public Map<Long, String> getRunningQueries() {
+ if (_multiStageWorkerRequestHandler != null) {
+ return _multiStageWorkerRequestHandler.getRunningQueries();
Review Comment:
This part is tricky. Even if multi-stage engine is configured, we might
still run query using the single stage engine.
For now we can probably always delegate the request to the single-stage
engine, and add a TODO to support it in the multi-stage engine. In the future,
we may return queries from both engines
##########
pinot-core/src/main/java/org/apache/pinot/core/query/scheduler/QueryScheduler.java:
##########
@@ -105,6 +117,73 @@ public QueryScheduler(PinotConfiguration config,
QueryExecutor queryExecutor, Re
*/
public abstract ListenableFuture<byte[]> submit(ServerQueryRequest
queryRequest);
+ /**
+ * Submit a query for execution and track runtime context about the query
for things like cancellation.
+ * @param queryRequest query to schedule for execution
+ * @return Listenable future for query result representing serialized
response. Custom callbacks can be added on
+ * the future to clean up the runtime context tracked during query execution.
+ */
+ public ListenableFuture<byte[]> submitQuery(ServerQueryRequest queryRequest)
{
+ ListenableFuture<byte[]> future = submit(queryRequest);
+ if (_enableQueryCancellation) {
+ String queryId = queryRequest.getQueryId();
+ // Track the running query for cancellation.
+ if (LOGGER.isDebugEnabled()) {
Review Comment:
(minor) On the broker side, we are not doing this check. Will this check
add/reduce overhead?
##########
pinot-core/src/main/java/org/apache/pinot/core/query/scheduler/QueryScheduler.java:
##########
@@ -105,6 +117,73 @@ public QueryScheduler(PinotConfiguration config,
QueryExecutor queryExecutor, Re
*/
public abstract ListenableFuture<byte[]> submit(ServerQueryRequest
queryRequest);
+ /**
+ * Submit a query for execution and track runtime context about the query
for things like cancellation.
+ * @param queryRequest query to schedule for execution
+ * @return Listenable future for query result representing serialized
response. Custom callbacks can be added on
+ * the future to clean up the runtime context tracked during query execution.
+ */
+ public ListenableFuture<byte[]> submitQuery(ServerQueryRequest queryRequest)
{
+ ListenableFuture<byte[]> future = submit(queryRequest);
+ if (_enableQueryCancellation) {
+ String queryId = queryRequest.getQueryId();
+ // Track the running query for cancellation.
+ if (LOGGER.isDebugEnabled()) {
+ LOGGER.debug("Keep track of running query: {}", queryId);
+ }
+ _queryFuturesById.put(queryId, future);
+ // And remove the track when the query ends.
+ Futures.addCallback(future, new FutureCallback<byte[]>() {
+ @Override
+ public void onSuccess(@Nullable byte[] ignored) {
+ if (LOGGER.isDebugEnabled()) {
+ LOGGER.debug("Remove track of running query: {} on success",
queryId);
+ }
+ _queryFuturesById.remove(queryId);
+ }
+
+ @Override
+ public void onFailure(Throwable ignored) {
+ if (LOGGER.isDebugEnabled()) {
+ LOGGER.debug("Remove track of running query: {} on failure",
queryId);
+ }
+ _queryFuturesById.remove(queryId);
+ }
+ }, MoreExecutors.directExecutor());
+ }
+ return future;
+ }
+
+ /**
+ * Cancel a query as identified by the queryId. This method is non-blocking
and the query may still run for a while
+ * after calling this method. This method can be called multiple times.
+ *
+ * @param queryId a unique Id to find the query
+ * @return true if a running query exists for the given queryId.
+ */
+ public boolean cancelQuery(String queryId) {
+ // Keep the future as it'll be cleaned up by the thread executing the
query.
+ Future<byte[]> future = _queryFuturesById.get(queryId);
+ if (future == null) {
+ return false;
+ }
+ boolean done = future.isDone();
+ if (!done) {
Review Comment:
(minor) I don't think this check is required. Cancel is no-op if a future is
done
##########
pinot-common/src/main/java/org/apache/pinot/common/utils/config/InstanceUtils.java:
##########
@@ -59,6 +60,32 @@ public static String getHelixInstanceId(Instance instance) {
return prefix + instance.getHost() + "_" + instance.getPort();
}
+ public static String getInstanceAdminEndpoint(InstanceConfig instanceConfig,
int defaultPort) {
+ // Backward-compatible with legacy hostname of format 'Server_<hostname>'
+ String hostname = instanceConfig.getHostName();
+ if (hostname.startsWith(Helix.PREFIX_OF_SERVER_INSTANCE)) {
+ hostname = hostname.substring(Helix.SERVER_INSTANCE_PREFIX_LENGTH);
+ }
+ return getInstanceAdminEndpoint(instanceConfig,
CommonConstants.HTTP_PROTOCOL, hostname, defaultPort);
+ }
+
+ public static String getInstanceAdminEndpoint(InstanceConfig instanceConfig,
String defaultProtocol, String hostname,
Review Comment:
Any specific reason why we might want to take a hostname instead of
extracting it from the instance config?
##########
pinot-server/src/main/java/org/apache/pinot/server/api/resources/QueryResource.java:
##########
@@ -0,0 +1,88 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pinot.server.api.resources;
+
+import io.swagger.annotations.Api;
+import io.swagger.annotations.ApiKeyAuthDefinition;
+import io.swagger.annotations.ApiOperation;
+import io.swagger.annotations.ApiParam;
+import io.swagger.annotations.ApiResponse;
+import io.swagger.annotations.ApiResponses;
+import io.swagger.annotations.Authorization;
+import io.swagger.annotations.SecurityDefinition;
+import io.swagger.annotations.SwaggerDefinition;
+import java.util.Set;
+import javax.inject.Inject;
+import javax.ws.rs.DELETE;
+import javax.ws.rs.GET;
+import javax.ws.rs.Path;
+import javax.ws.rs.PathParam;
+import javax.ws.rs.Produces;
+import javax.ws.rs.WebApplicationException;
+import javax.ws.rs.core.HttpHeaders;
+import javax.ws.rs.core.MediaType;
+import javax.ws.rs.core.Response;
+import org.apache.pinot.server.starter.ServerInstance;
+
+import static
org.apache.pinot.spi.utils.CommonConstants.SWAGGER_AUTHORIZATION_KEY;
+
+
+/**
+ * API to cancel query running on the server, given a queryId.
+ */
+@Api(tags = "Query", authorizations = {@Authorization(value =
SWAGGER_AUTHORIZATION_KEY)})
+@SwaggerDefinition(securityDefinition =
@SecurityDefinition(apiKeyAuthDefinitions = @ApiKeyAuthDefinition(name =
+ HttpHeaders.AUTHORIZATION, in =
ApiKeyAuthDefinition.ApiKeyLocation.HEADER, key = SWAGGER_AUTHORIZATION_KEY)))
+@Path("/")
+public class QueryResource {
+ @Inject
+ private ServerInstance _serverInstance;
+
+ @DELETE
+ @Path("/query/{queryId}")
+ @Produces(MediaType.APPLICATION_JSON)
+ @ApiOperation(value = "Cancel a query running on the server as identified by
the queryId", notes = "No effect if "
+ + "no query exists for the given queryId. Query may continue to run for
a short while after calling cancel as "
+ + "it's done in a non-blocking manner. The cancel API can be called
multiple times.")
+ @ApiResponses(value = {
+ @ApiResponse(code = 200, message = "Success"), @ApiResponse(code = 500,
message = "Internal server error"),
+ @ApiResponse(code = 404, message = "Query not found running on the
server")
+ })
+ public String cancelQuery(
+ @ApiParam(value = "QueryId as in the format of <brokerId>_<requestId>",
required = true) @PathParam("queryId")
+ String queryId) {
+ if (_serverInstance.getQueryScheduler().cancelQuery(queryId)) {
+ return "Cancelled query: " + queryId;
+ }
+ throw new WebApplicationException(
+ Response.status(Response.Status.NOT_FOUND).entity("Query: " + queryId
+ " not found on the server").build());
+ }
+
+ @GET
+ @Path("/query/id")
Review Comment:
```suggestion
@Path("/queries/id")
```
##########
pinot-core/src/main/java/org/apache/pinot/core/query/scheduler/QueryScheduler.java:
##########
@@ -105,6 +117,73 @@ public QueryScheduler(PinotConfiguration config,
QueryExecutor queryExecutor, Re
*/
public abstract ListenableFuture<byte[]> submit(ServerQueryRequest
queryRequest);
+ /**
+ * Submit a query for execution and track runtime context about the query
for things like cancellation.
+ * @param queryRequest query to schedule for execution
+ * @return Listenable future for query result representing serialized
response. Custom callbacks can be added on
+ * the future to clean up the runtime context tracked during query execution.
+ */
+ public ListenableFuture<byte[]> submitQuery(ServerQueryRequest queryRequest)
{
+ ListenableFuture<byte[]> future = submit(queryRequest);
+ if (_enableQueryCancellation) {
+ String queryId = queryRequest.getQueryId();
+ // Track the running query for cancellation.
+ if (LOGGER.isDebugEnabled()) {
+ LOGGER.debug("Keep track of running query: {}", queryId);
+ }
+ _queryFuturesById.put(queryId, future);
+ // And remove the track when the query ends.
+ Futures.addCallback(future, new FutureCallback<byte[]>() {
+ @Override
+ public void onSuccess(@Nullable byte[] ignored) {
+ if (LOGGER.isDebugEnabled()) {
+ LOGGER.debug("Remove track of running query: {} on success",
queryId);
+ }
+ _queryFuturesById.remove(queryId);
+ }
+
+ @Override
+ public void onFailure(Throwable ignored) {
+ if (LOGGER.isDebugEnabled()) {
+ LOGGER.debug("Remove track of running query: {} on failure",
queryId);
+ }
+ _queryFuturesById.remove(queryId);
+ }
+ }, MoreExecutors.directExecutor());
+ }
+ return future;
+ }
+
+ /**
+ * Cancel a query as identified by the queryId. This method is non-blocking
and the query may still run for a while
+ * after calling this method. This method can be called multiple times.
+ *
+ * @param queryId a unique Id to find the query
+ * @return true if a running query exists for the given queryId.
+ */
+ public boolean cancelQuery(String queryId) {
+ // Keep the future as it'll be cleaned up by the thread executing the
query.
+ Future<byte[]> future = _queryFuturesById.get(queryId);
+ if (future == null) {
+ return false;
+ }
+ boolean done = future.isDone();
+ if (!done) {
+ future.cancel(true);
+ }
+ if (LOGGER.isDebugEnabled()) {
+ LOGGER.debug("Cancelled query: {} that's done: {}", queryId, done);
+ }
+ return true;
+ }
+
+ /**
+ * @return list of ids of the queries currently running on the server.
+ */
+ public Set<String> getRunningQueries() {
Review Comment:
(minor) `getRunningQueryIds()` to be more clear
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]