This is an automated email from the ASF dual-hosted git repository.
jackie pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pinot.git
The following commit(s) were added to refs/heads/master by this push:
new 3a655d2d0e support to show running queries and cancel query by id
(#9171)
3a655d2d0e is described below
commit 3a655d2d0e86cf208a1d84e4d7c776fa093dbb2a
Author: Xiaobing <[email protected]>
AuthorDate: Wed Aug 17 11:29:46 2022 -0700
support to show running queries and cancel query by id (#9171)
---
.../broker/api/resources/PinotClientRequest.java | 63 ++++++++++
.../broker/broker/BrokerAdminApiApplication.java | 14 +++
.../requesthandler/BaseBrokerRequestHandler.java | 139 ++++++++++++++++++++-
.../requesthandler/BrokerRequestHandler.java | 19 +++
.../BrokerRequestHandlerDelegate.java | 19 +++
.../BaseBrokerRequestHandlerTest.java | 96 ++++++++++++++
...{MultiGetRequest.java => MultiHttpRequest.java} | 58 ++++++---
.../pinot/common/utils/config/InstanceUtils.java | 26 ++++
...tRequestTest.java => MultiHttpRequestTest.java} | 8 +-
.../pinot/controller/BaseControllerStarter.java | 3 +-
.../helix/core/PinotHelixResourceManager.java | 24 +---
.../controller/util/CompletionServiceHelper.java | 4 +-
.../core/query/request/ServerQueryRequest.java | 8 ++
.../pinot/core/query/scheduler/QueryScheduler.java | 86 ++++++++++++-
.../core/transport/InstanceRequestHandler.java | 2 +-
.../pinot/core/transport/ServerInstance.java | 9 ++
.../core/query/scheduler/QuerySchedulerTest.java | 76 +++++++++++
.../pinot/core/transport/QueryRoutingTest.java | 2 +-
.../pinot/server/api/resources/QueryResource.java | 100 +++++++++++++++
.../pinot/server/starter/ServerInstance.java | 4 +
.../apache/pinot/spi/utils/CommonConstants.java | 1 +
21 files changed, 702 insertions(+), 59 deletions(-)
diff --git
a/pinot-broker/src/main/java/org/apache/pinot/broker/api/resources/PinotClientRequest.java
b/pinot-broker/src/main/java/org/apache/pinot/broker/api/resources/PinotClientRequest.java
index 5c7ce3299f..e1ccd67b2f 100644
---
a/pinot-broker/src/main/java/org/apache/pinot/broker/api/resources/PinotClientRequest.java
+++
b/pinot-broker/src/main/java/org/apache/pinot/broker/api/resources/PinotClientRequest.java
@@ -33,10 +33,14 @@ import io.swagger.annotations.SecurityDefinition;
import io.swagger.annotations.SwaggerDefinition;
import java.util.HashMap;
import java.util.Map;
+import java.util.concurrent.Executor;
import javax.inject.Inject;
+import javax.ws.rs.DELETE;
+import javax.ws.rs.DefaultValue;
import javax.ws.rs.GET;
import javax.ws.rs.POST;
import javax.ws.rs.Path;
+import javax.ws.rs.PathParam;
import javax.ws.rs.Produces;
import javax.ws.rs.QueryParam;
import javax.ws.rs.WebApplicationException;
@@ -46,6 +50,7 @@ import javax.ws.rs.core.Context;
import javax.ws.rs.core.HttpHeaders;
import javax.ws.rs.core.MediaType;
import javax.ws.rs.core.Response;
+import org.apache.commons.httpclient.HttpConnectionManager;
import org.apache.pinot.broker.api.HttpRequesterIdentity;
import org.apache.pinot.broker.requesthandler.BrokerRequestHandler;
import org.apache.pinot.common.exception.QueryException;
@@ -84,6 +89,12 @@ public class PinotClientRequest {
@Inject
private BrokerMetrics _brokerMetrics;
+ @Inject
+ private Executor _executor;
+
+ @Inject
+ private HttpConnectionManager _httpConnMgr;
+
@GET
@ManagedAsync
@Produces(MediaType.APPLICATION_JSON)
@@ -141,6 +152,58 @@ public class PinotClientRequest {
}
}
+ @DELETE
+ @Path("query/{queryId}")
+ @Produces(MediaType.APPLICATION_JSON)
+ @ApiOperation(value = "Cancel a query as identified by the queryId", notes =
"No effect if no query exists for the "
+ + "given queryId on the requested broker. Query may continue to run for
a short while after calling cancel as "
+ + "it's done in a non-blocking manner. The cancel method can be called
multiple times.")
+ @ApiResponses(value = {
+ @ApiResponse(code = 200, message = "Success"), @ApiResponse(code = 500,
message = "Internal server error"),
+ @ApiResponse(code = 404, message = "Query not found on the requested
broker")
+ })
+ public String cancelQuery(
+ @ApiParam(value = "QueryId as assigned by the broker", required = true)
@PathParam("queryId") long queryId,
+ @ApiParam(value = "Timeout for servers to respond the cancel request")
@QueryParam("timeoutMs")
+ @DefaultValue("3000") int timeoutMs,
+ @ApiParam(value = "Return server responses for troubleshooting")
@QueryParam("verbose") @DefaultValue("false")
+ boolean verbose) {
+ try {
+ Map<String, Integer> serverResponses = verbose ? new HashMap<>() : null;
+ if (_requestHandler.cancelQuery(queryId, timeoutMs, _executor,
_httpConnMgr, serverResponses)) {
+ String resp = "Cancelled query: " + queryId;
+ if (verbose) {
+ resp += " with responses from servers: " + serverResponses;
+ }
+ return resp;
+ }
+ } catch (Exception e) {
+ throw new
WebApplicationException(Response.status(Response.Status.INTERNAL_SERVER_ERROR)
+ .entity(String.format("Failed to cancel query: %s on the broker due
to error: %s", queryId, e.getMessage()))
+ .build());
+ }
+ throw new WebApplicationException(
+
Response.status(Response.Status.NOT_FOUND).entity(String.format("Query: %s not
found on the broker", queryId))
+ .build());
+ }
+
+ @GET
+ @Path("queries")
+ @Produces(MediaType.APPLICATION_JSON)
+ @ApiOperation(value = "Get queryIds of the running queries submitted via the
requested broker", notes = "The id is "
+ + "assigned by the requested broker and only unique at the scope of this
broker")
+ @ApiResponses(value = {
+ @ApiResponse(code = 200, message = "Success"), @ApiResponse(code = 500,
message = "Internal server error")
+ })
+ public Map<Long, String> getRunningQueries() {
+ try {
+ return _requestHandler.getRunningQueries();
+ } catch (Exception e) {
+ throw new
WebApplicationException(Response.status(Response.Status.INTERNAL_SERVER_ERROR)
+ .entity("Failed to get running queries on the broker due to error: "
+ e.getMessage()).build());
+ }
+ }
+
private BrokerResponse executeSqlQuery(ObjectNode sqlRequestJson,
HttpRequesterIdentity httpRequesterIdentity,
boolean onlyDql)
throws Exception {
diff --git
a/pinot-broker/src/main/java/org/apache/pinot/broker/broker/BrokerAdminApiApplication.java
b/pinot-broker/src/main/java/org/apache/pinot/broker/broker/BrokerAdminApiApplication.java
index fb8c4d6f0a..3978b65891 100644
---
a/pinot-broker/src/main/java/org/apache/pinot/broker/broker/BrokerAdminApiApplication.java
+++
b/pinot-broker/src/main/java/org/apache/pinot/broker/broker/BrokerAdminApiApplication.java
@@ -18,11 +18,17 @@
*/
package org.apache.pinot.broker.broker;
+import com.google.common.util.concurrent.ThreadFactoryBuilder;
import io.swagger.jaxrs.config.BeanConfig;
import java.io.IOException;
import java.net.URL;
import java.net.URLClassLoader;
import java.util.List;
+import java.util.concurrent.Executor;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import org.apache.commons.httpclient.HttpConnectionManager;
+import org.apache.commons.httpclient.MultiThreadedHttpConnectionManager;
import org.apache.pinot.broker.requesthandler.BrokerRequestHandler;
import org.apache.pinot.broker.routing.BrokerRoutingManager;
import org.apache.pinot.common.metrics.BrokerMetrics;
@@ -57,9 +63,17 @@ public class BrokerAdminApiApplication extends
ResourceConfig {
if
(brokerConf.getProperty(CommonConstants.Broker.BROKER_SERVICE_AUTO_DISCOVERY,
false)) {
register(ServiceAutoDiscoveryFeature.class);
}
+ ExecutorService executor =
+ Executors.newCachedThreadPool(new
ThreadFactoryBuilder().setNameFormat("async-task-thread-%d").build());
+ MultiThreadedHttpConnectionManager connMgr = new
MultiThreadedHttpConnectionManager();
+ connMgr.getParams().setConnectionTimeout((int) brokerConf
+ .getProperty(CommonConstants.Broker.CONFIG_OF_BROKER_TIMEOUT_MS,
+ CommonConstants.Broker.DEFAULT_BROKER_TIMEOUT_MS));
register(new AbstractBinder() {
@Override
protected void configure() {
+ bind(connMgr).to(HttpConnectionManager.class);
+ bind(executor).to(Executor.class);
bind(sqlQueryExecutor).to(SqlQueryExecutor.class);
bind(routingManager).to(BrokerRoutingManager.class);
bind(brokerRequestHandler).to(BrokerRequestHandler.class);
diff --git
a/pinot-broker/src/main/java/org/apache/pinot/broker/requesthandler/BaseBrokerRequestHandler.java
b/pinot-broker/src/main/java/org/apache/pinot/broker/requesthandler/BaseBrokerRequestHandler.java
index 6afa20c95d..f5dba937ba 100644
---
a/pinot-broker/src/main/java/org/apache/pinot/broker/requesthandler/BaseBrokerRequestHandler.java
+++
b/pinot-broker/src/main/java/org/apache/pinot/broker/requesthandler/BaseBrokerRequestHandler.java
@@ -26,18 +26,26 @@ import com.google.common.util.concurrent.RateLimiter;
import java.net.InetAddress;
import java.util.ArrayList;
import java.util.Arrays;
+import java.util.Collections;
import java.util.HashMap;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Set;
+import java.util.concurrent.CompletionService;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.Executor;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicLong;
+import java.util.stream.Collectors;
import javax.annotation.Nullable;
import javax.annotation.concurrent.ThreadSafe;
import org.apache.commons.collections.CollectionUtils;
+import org.apache.commons.httpclient.HttpConnectionManager;
+import org.apache.commons.httpclient.URI;
+import org.apache.commons.httpclient.methods.DeleteMethod;
import org.apache.commons.lang3.StringUtils;
import org.apache.pinot.broker.api.HttpRequesterIdentity;
import org.apache.pinot.broker.api.RequesterIdentity;
@@ -47,6 +55,7 @@ import org.apache.pinot.broker.routing.BrokerRoutingManager;
import org.apache.pinot.broker.routing.timeboundary.TimeBoundaryInfo;
import org.apache.pinot.common.config.provider.TableCache;
import org.apache.pinot.common.exception.QueryException;
+import org.apache.pinot.common.http.MultiHttpRequest;
import org.apache.pinot.common.metrics.BrokerGauge;
import org.apache.pinot.common.metrics.BrokerMeter;
import org.apache.pinot.common.metrics.BrokerMetrics;
@@ -126,6 +135,8 @@ public abstract class BaseBrokerRequestHandler implements
BrokerRequestHandler {
private final int _defaultHllLog2m;
private final boolean _enableQueryLimitOverride;
private final boolean _enableDistinctCountBitmapOverride;
+ private final Map<Long, QueryServers> _queriesById = new
ConcurrentHashMap<>();
+ private final boolean _enableQueryCancellation;
public BaseBrokerRequestHandler(PinotConfiguration config,
BrokerRoutingManager routingManager,
AccessControlFactory accessControlFactory, QueryQuotaManager
queryQuotaManager, TableCache tableCache,
@@ -154,9 +165,13 @@ public abstract class BaseBrokerRequestHandler implements
BrokerRequestHandler {
Broker.DEFAULT_BROKER_QUERY_LOG_MAX_RATE_PER_SECOND));
_numDroppedLog = new AtomicInteger(0);
_numDroppedLogRateLimiter = RateLimiter.create(1.0);
+ _enableQueryCancellation =
+
Boolean.parseBoolean(config.getProperty(Broker.CONFIG_OF_BROKER_ENABLE_QUERY_CANCELLATION));
LOGGER.info(
- "Broker Id: {}, timeout: {}ms, query response limit: {}, query log
length: {}, query log max rate: {}qps",
- _brokerId, _brokerTimeoutMs, _queryResponseLimit, _queryLogLength,
_queryLogRateLimiter.getRate());
+ "Broker Id: {}, timeout: {}ms, query response limit: {}, query log
length: {}, query log max rate: {}qps, "
+ + "enabling query cancellation: {}",
+ _brokerId, _brokerTimeoutMs, _queryResponseLimit, _queryLogLength,
_queryLogRateLimiter.getRate(),
+ _enableQueryCancellation);
}
private String getDefaultBrokerId() {
@@ -168,6 +183,74 @@ public abstract class BaseBrokerRequestHandler implements
BrokerRequestHandler {
}
}
+ @Override
+ public Map<Long, String> getRunningQueries() {
+ Preconditions.checkArgument(_enableQueryCancellation, "Query cancellation
is not enabled on broker");
+ return
_queriesById.entrySet().stream().collect(Collectors.toMap(Map.Entry::getKey, e
-> e.getValue()._query));
+ }
+
+ @VisibleForTesting
+ Set<ServerInstance> getRunningServers(long requestId) {
+ Preconditions.checkArgument(_enableQueryCancellation, "Query cancellation
is not enabled on broker");
+ QueryServers queryServers = _queriesById.get(requestId);
+ return (queryServers == null) ? Collections.emptySet() :
queryServers._servers;
+ }
+
+ @Override
+ public boolean cancelQuery(long queryId, int timeoutMs, Executor executor,
HttpConnectionManager connMgr,
+ Map<String, Integer> serverResponses)
+ throws Exception {
+ Preconditions.checkArgument(_enableQueryCancellation, "Query cancellation
is not enabled on broker");
+ QueryServers queryServers = _queriesById.get(queryId);
+ if (queryServers == null) {
+ return false;
+ }
+ String globalId = getGlobalQueryId(queryId);
+ List<String> serverUrls = new ArrayList<>();
+ for (ServerInstance server : queryServers._servers) {
+ serverUrls.add(String.format("%s/query/%s", server.getAdminEndpoint(),
globalId));
+ }
+ if (serverUrls.isEmpty()) {
+ LOGGER.debug("No servers running the query: {} right now", globalId);
+ return true;
+ }
+ LOGGER.debug("Cancelling the query: {} via server urls: {}", globalId,
serverUrls);
+ CompletionService<DeleteMethod> completionService =
+ new MultiHttpRequest(executor, connMgr).execute(serverUrls, null,
timeoutMs, "DELETE", DeleteMethod::new);
+ List<String> errMsgs = new ArrayList<>(serverUrls.size());
+ for (int i = 0; i < serverUrls.size(); i++) {
+ DeleteMethod deleteMethod = null;
+ try {
+ // Wait for all requests to respond before returning to be sure that
the servers have handled the cancel
+ // requests. The completion order is different from serverUrls, thus
use uri in the response.
+ deleteMethod = completionService.take().get();
+ URI uri = deleteMethod.getURI();
+ int status = deleteMethod.getStatusCode();
+ // Unexpected server responses are collected and returned as exception.
+ if (status != 200 && status != 404) {
+ throw new Exception(String.format("Unexpected status=%d and
response='%s' from uri='%s'", status,
+ deleteMethod.getResponseBodyAsString(), uri));
+ }
+ if (serverResponses != null) {
+ serverResponses.put(uri.getHost() + ":" + uri.getPort(), status);
+ }
+ } catch (Exception e) {
+ LOGGER.error("Failed to cancel query: {}", globalId, e);
+ // Can't just throw exception from here as there is a need to release
the other connections.
+ // So just collect the error msg to throw them together after the
for-loop.
+ errMsgs.add(e.getMessage());
+ } finally {
+ if (deleteMethod != null) {
+ deleteMethod.releaseConnection();
+ }
+ }
+ }
+ if (errMsgs.size() > 0) {
+ throw new Exception("Unexpected responses from servers: " +
StringUtils.join(errMsgs, ","));
+ }
+ return true;
+ }
+
@Override
public BrokerResponseNative handleRequest(JsonNode request, @Nullable
SqlNodeAndOptions sqlNodeAndOptions,
@Nullable RequesterIdentity requesterIdentity, RequestContext
requestContext)
@@ -191,9 +274,16 @@ public abstract class BaseBrokerRequestHandler implements
BrokerRequestHandler {
if (sql == null) {
throw new BadQueryRequestException("Failed to find 'sql' in the request:
" + request);
}
- String query = sql.asText();
- requestContext.setQuery(query);
- return handleRequest(requestId, query, sqlNodeAndOptions, request,
requesterIdentity, requestContext);
+ try {
+ String query = sql.asText();
+ requestContext.setQuery(query);
+ return handleRequest(requestId, query, sqlNodeAndOptions, request,
requesterIdentity, requestContext);
+ } finally {
+ if (_enableQueryCancellation) {
+ _queriesById.remove(requestId);
+ LOGGER.debug("Remove track of running query: {}", requestId);
+ }
+ }
}
private BrokerResponseNative handleRequest(long requestId, String query,
@@ -576,6 +666,19 @@ public abstract class BaseBrokerRequestHandler implements
BrokerRequestHandler {
realtimeRoutingTable = null;
}
}
+ if (_enableQueryCancellation) {
+ // Start to track the running query for cancellation just before sending
it out to servers to avoid any potential
+ // failures that could happen before sending it out, like failures to
calculate the routing table etc.
+ // TODO: Even tracking the query as late as here, a potential race
condition between calling cancel API and
+ // query being sent out to servers can still happen. If cancel
request arrives earlier than query being
+ // sent out to servers, the servers miss the cancel request and
continue to run the queries. The users
+ // can always list the running queries and cancel query again
until it ends. Just that such race
+ // condition makes cancel API less reliable. This should be rare
as it assumes sending queries out to
+ // servers takes time, but will address later if needed.
+ QueryServers queryServers = _queriesById.computeIfAbsent(requestId, k ->
new QueryServers(query));
+ LOGGER.debug("Keep track of running query: {}", requestId);
+ queryServers.addServers(offlineRoutingTable, realtimeRoutingTable);
+ }
// TODO: Modify processBrokerRequest() to directly take PinotQuery
BrokerResponseNative brokerResponse =
processBrokerRequest(requestId, brokerRequest, serverBrokerRequest,
offlineBrokerRequest, offlineRoutingTable,
@@ -1650,6 +1753,10 @@ public abstract class BaseBrokerRequestHandler
implements BrokerRequestHandler {
statistics.setNumRowsResultSet(response.getNumRowsResultSet());
}
+ private String getGlobalQueryId(long requestId) {
+ return _brokerId + "_" + requestId;
+ }
+
/**
* Helper class to pass the per server statistics.
*/
@@ -1664,4 +1771,26 @@ public abstract class BaseBrokerRequestHandler
implements BrokerRequestHandler {
_serverStats = serverStats;
}
}
+
+ /**
+ * Helper class to track the query plaintext and the requested servers.
+ */
+ private static class QueryServers {
+ private final String _query;
+ private final Set<ServerInstance> _servers = Collections.newSetFromMap(new
ConcurrentHashMap<>());
+
+ public QueryServers(String query) {
+ _query = query;
+ }
+
+ public void addServers(Map<ServerInstance, List<String>>
offlineRoutingTable,
+ Map<ServerInstance, List<String>> realtimeRoutingTable) {
+ if (offlineRoutingTable != null) {
+ _servers.addAll(offlineRoutingTable.keySet());
+ }
+ if (realtimeRoutingTable != null) {
+ _servers.addAll(realtimeRoutingTable.keySet());
+ }
+ }
+ }
}
diff --git
a/pinot-broker/src/main/java/org/apache/pinot/broker/requesthandler/BrokerRequestHandler.java
b/pinot-broker/src/main/java/org/apache/pinot/broker/requesthandler/BrokerRequestHandler.java
index 799bfe8295..de42134a6a 100644
---
a/pinot-broker/src/main/java/org/apache/pinot/broker/requesthandler/BrokerRequestHandler.java
+++
b/pinot-broker/src/main/java/org/apache/pinot/broker/requesthandler/BrokerRequestHandler.java
@@ -19,8 +19,11 @@
package org.apache.pinot.broker.requesthandler;
import com.fasterxml.jackson.databind.JsonNode;
+import java.util.Map;
+import java.util.concurrent.Executor;
import javax.annotation.Nullable;
import javax.annotation.concurrent.ThreadSafe;
+import org.apache.commons.httpclient.HttpConnectionManager;
import org.apache.pinot.broker.api.RequesterIdentity;
import org.apache.pinot.common.response.broker.BrokerResponseNative;
import org.apache.pinot.spi.trace.RequestContext;
@@ -43,4 +46,20 @@ public interface BrokerRequestHandler {
throws Exception {
return handleRequest(request, null, requesterIdentity, requestContext);
}
+
+ Map<Long, String> getRunningQueries();
+
+ /**
+ * Cancel a query as identified by the queryId. This method is non-blocking
so the query may still run for a while
+ * after calling this method. This cancel method can be called multiple
times.
+ * @param queryId the unique Id assigned to the query by the broker
+ * @param timeoutMs timeout to wait for servers to respond the cancel
requests
+ * @param executor to send cancel requests to servers in parallel
+ * @param connMgr to provide the http connections
+ * @param serverResponses to collect cancel responses from all servers if a
map is provided
+ * @return true if there is a running query for the given queryId.
+ */
+ boolean cancelQuery(long queryId, int timeoutMs, Executor executor,
HttpConnectionManager connMgr,
+ Map<String, Integer> serverResponses)
+ throws Exception;
}
diff --git
a/pinot-broker/src/main/java/org/apache/pinot/broker/requesthandler/BrokerRequestHandlerDelegate.java
b/pinot-broker/src/main/java/org/apache/pinot/broker/requesthandler/BrokerRequestHandlerDelegate.java
index ba22f3f481..7a2a085273 100644
---
a/pinot-broker/src/main/java/org/apache/pinot/broker/requesthandler/BrokerRequestHandlerDelegate.java
+++
b/pinot-broker/src/main/java/org/apache/pinot/broker/requesthandler/BrokerRequestHandlerDelegate.java
@@ -20,7 +20,9 @@ package org.apache.pinot.broker.requesthandler;
import com.fasterxml.jackson.databind.JsonNode;
import java.util.Map;
+import java.util.concurrent.Executor;
import javax.annotation.Nullable;
+import org.apache.commons.httpclient.HttpConnectionManager;
import org.apache.pinot.broker.api.RequesterIdentity;
import org.apache.pinot.common.exception.QueryException;
import org.apache.pinot.common.metrics.BrokerMeter;
@@ -116,4 +118,21 @@ public class BrokerRequestHandlerDelegate implements
BrokerRequestHandler {
}
return false;
}
+
+ @Override
+ public Map<Long, String> getRunningQueries() {
+ // TODO: add support for multiStaged engine: track running queries for
multiStaged engine and combine its
+ // running queries with those from singleStaged engine. Both engines
share the same request Id generator, so
+ // the query will have unique ids across the two engines.
+ return _singleStageBrokerRequestHandler.getRunningQueries();
+ }
+
+ @Override
+ public boolean cancelQuery(long queryId, int timeoutMs, Executor executor,
HttpConnectionManager connMgr,
+ Map<String, Integer> serverResponses)
+ throws Exception {
+ // TODO: add support for multiStaged engine, basically try to cancel the
query on multiStaged engine firstly; if
+ // not found, try on the singleStaged engine.
+ return _singleStageBrokerRequestHandler.cancelQuery(queryId, timeoutMs,
executor, connMgr, serverResponses);
+ }
}
diff --git
a/pinot-broker/src/test/java/org/apache/pinot/broker/requesthandler/BaseBrokerRequestHandlerTest.java
b/pinot-broker/src/test/java/org/apache/pinot/broker/requesthandler/BaseBrokerRequestHandlerTest.java
index f4e09259ff..cff4eff193 100644
---
a/pinot-broker/src/test/java/org/apache/pinot/broker/requesthandler/BaseBrokerRequestHandlerTest.java
+++
b/pinot-broker/src/test/java/org/apache/pinot/broker/requesthandler/BaseBrokerRequestHandlerTest.java
@@ -18,20 +18,46 @@
*/
package org.apache.pinot.broker.requesthandler;
+import com.fasterxml.jackson.databind.JsonNode;
import com.google.common.collect.ImmutableMap;
+import java.util.Collections;
import java.util.HashMap;
+import java.util.List;
import java.util.Map;
+import java.util.Set;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.CountDownLatch;
+import javax.annotation.Nullable;
+import org.apache.helix.model.InstanceConfig;
+import org.apache.pinot.broker.broker.AllowAllAccessControlFactory;
+import org.apache.pinot.broker.queryquota.QueryQuotaManager;
+import org.apache.pinot.broker.routing.BrokerRoutingManager;
import org.apache.pinot.common.config.provider.TableCache;
+import org.apache.pinot.common.metrics.BrokerMetrics;
+import org.apache.pinot.common.request.BrokerRequest;
import org.apache.pinot.common.request.Expression;
import org.apache.pinot.common.request.PinotQuery;
+import org.apache.pinot.common.response.broker.BrokerResponseNative;
+import org.apache.pinot.core.routing.RoutingTable;
+import org.apache.pinot.core.transport.ServerInstance;
+import org.apache.pinot.spi.config.table.TableConfig;
+import org.apache.pinot.spi.config.table.TenantConfig;
import org.apache.pinot.spi.env.PinotConfiguration;
import org.apache.pinot.spi.exception.BadQueryRequestException;
+import org.apache.pinot.spi.metrics.PinotMetricUtils;
+import org.apache.pinot.spi.trace.RequestContext;
+import org.apache.pinot.spi.trace.Tracing;
import org.apache.pinot.spi.utils.CommonConstants;
+import org.apache.pinot.spi.utils.JsonUtils;
import org.apache.pinot.sql.parsers.CalciteSqlParser;
+import org.apache.pinot.util.TestUtils;
import org.mockito.Mockito;
import org.testng.Assert;
import org.testng.annotations.Test;
+import static org.mockito.ArgumentMatchers.any;
+import static org.mockito.ArgumentMatchers.anyString;
+import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.when;
@@ -156,4 +182,74 @@ public class BaseBrokerRequestHandlerTest {
Assert.assertEquals(BaseBrokerRequestHandler.getActualTableName("db.namespace.mytable",
tableCache),
"db.namespace.mytable");
}
+
+ @Test
+ public void testCancelQuery()
+ throws Exception {
+ String tableName = "myTable_OFFLINE";
+ // Mock pretty much everything until the query can be submitted.
+ TableCache tableCache = mock(TableCache.class);
+ TableConfig tableCfg = mock(TableConfig.class);
+ when(tableCache.getActualTableName(anyString())).thenReturn(tableName);
+ TenantConfig tenant = new TenantConfig("tier_BROKER", "tier_SERVER", null);
+ when(tableCfg.getTenantConfig()).thenReturn(tenant);
+ when(tableCache.getTableConfig(anyString())).thenReturn(tableCfg);
+ BrokerRoutingManager routingManager = mock(BrokerRoutingManager.class);
+ when(routingManager.routingExists(anyString())).thenReturn(true);
+ RoutingTable rt = mock(RoutingTable.class);
+ when(rt.getServerInstanceToSegmentsMap()).thenReturn(Collections
+ .singletonMap(new ServerInstance(new InstanceConfig("server01_9000")),
Collections.singletonList("segment01")));
+ when(routingManager.getRoutingTable(any())).thenReturn(rt);
+ QueryQuotaManager queryQuotaManager = mock(QueryQuotaManager.class);
+ when(queryQuotaManager.acquire(anyString())).thenReturn(true);
+ CountDownLatch latch = new CountDownLatch(1);
+ PinotConfiguration config =
+ new
PinotConfiguration(Collections.singletonMap("pinot.broker.enable.query.cancellation",
"true"));
+ BaseBrokerRequestHandler requestHandler =
+ new BaseBrokerRequestHandler(config, routingManager, new
AllowAllAccessControlFactory(),
+ queryQuotaManager, tableCache,
+ new BrokerMetrics("", PinotMetricUtils.getPinotMetricsRegistry(),
true, Collections.emptySet())) {
+ @Override
+ public void start() {
+ }
+
+ @Override
+ public void shutDown() {
+ }
+
+ @Override
+ protected BrokerResponseNative processBrokerRequest(long requestId,
BrokerRequest originalBrokerRequest,
+ BrokerRequest serverBrokerRequest, @Nullable BrokerRequest
offlineBrokerRequest,
+ @Nullable Map<ServerInstance, List<String>> offlineRoutingTable,
+ @Nullable BrokerRequest realtimeBrokerRequest,
+ @Nullable Map<ServerInstance, List<String>>
realtimeRoutingTable, long timeoutMs, ServerStats serverStats,
+ RequestContext requestContext)
+ throws Exception {
+ latch.await();
+ return null;
+ }
+ };
+ CompletableFuture.runAsync(() -> {
+ try {
+ JsonNode request = JsonUtils.stringToJsonNode(
+ String.format("{\"sql\":\"select * from %s limit
10\",\"queryOptions\":\"timeoutMs=10000\"}", tableName));
+ RequestContext requestStats = Tracing.getTracer().createRequestScope();
+ requestHandler.handleRequest(request, null, requestStats);
+ } catch (Exception e) {
+ throw new RuntimeException(e);
+ }
+ });
+ TestUtils.waitForCondition((aVoid) ->
requestHandler.getRunningServers(1).size() == 1, 500, 5000,
+ "Failed to submit query");
+ Map.Entry<Long, String> entry =
requestHandler.getRunningQueries().entrySet().iterator().next();
+ Assert.assertEquals(entry.getKey().longValue(), 1);
+ Assert.assertTrue(entry.getValue().contains("select * from myTable_OFFLINE
limit 10"));
+ Set<ServerInstance> servers = requestHandler.getRunningServers(1);
+ Assert.assertEquals(servers.size(), 1);
+ Assert.assertEquals(servers.iterator().next().getHostname(), "server01");
+ Assert.assertEquals(servers.iterator().next().getPort(), 9000);
+ Assert.assertEquals(servers.iterator().next().getInstanceId(),
"server01_9000");
+ Assert.assertEquals(servers.iterator().next().getAdminEndpoint(),
"http://server01:8097");
+ latch.countDown();
+ }
}
diff --git
a/pinot-common/src/main/java/org/apache/pinot/common/http/MultiGetRequest.java
b/pinot-common/src/main/java/org/apache/pinot/common/http/MultiHttpRequest.java
similarity index 60%
rename from
pinot-common/src/main/java/org/apache/pinot/common/http/MultiGetRequest.java
rename to
pinot-common/src/main/java/org/apache/pinot/common/http/MultiHttpRequest.java
index 9f6e3f3158..f66c51d2e1 100644
---
a/pinot-common/src/main/java/org/apache/pinot/common/http/MultiGetRequest.java
+++
b/pinot-common/src/main/java/org/apache/pinot/common/http/MultiHttpRequest.java
@@ -23,9 +23,11 @@ import java.util.Map;
import java.util.concurrent.CompletionService;
import java.util.concurrent.Executor;
import java.util.concurrent.ExecutorCompletionService;
+import java.util.function.Function;
import javax.annotation.Nullable;
import org.apache.commons.httpclient.HttpClient;
import org.apache.commons.httpclient.HttpConnectionManager;
+import org.apache.commons.httpclient.HttpMethodBase;
import org.apache.commons.httpclient.methods.GetMethod;
import org.apache.commons.httpclient.params.HttpClientParams;
import org.slf4j.Logger;
@@ -33,23 +35,21 @@ import org.slf4j.LoggerFactory;
/**
- * Class to support multiple http GET operations in parallel by using
- * the executor that is passed in.
+ * Class to support multiple http operations in parallel by using the executor
that is passed in. This is a wrapper
+ * around Apache common HTTP client.
*
- * This is a wrapper around Apache common HTTP client.
+ * The execute method is re-usable but there is no real benefit to it. All the
connection management is handled by
+ * the input HttpConnectionManager. Note that we cannot use
SimpleHttpConnectionManager as it is not thread safe. Use
+ * MultiThreadedHttpConnectionManager as shown in the example below. As GET is
commonly used, there is a dedicated
+ * execute method for it. Other http methods like DELETE can use the generic
version of execute method.
*
- * The execute method is re-usable but there is no real benefit to it. All
- * the connection management is handled by the input HttpConnectionManager.
- * Note that we cannot use SimpleHttpConnectionManager as it is not thread
- * safe. Use MultiThreadedHttpConnectionManager as shown in the example
- * below
* Usage:
* <pre>
* {@code
* List<String> urls = Arrays.asList("http://www.linkedin.com",
"http://www.google.com");
- * MultiGetRequest mget = new
MultiGetRequest(Executors.newCachedThreadPool(),
+ * MultiHttpRequest mhr = new
MultiHttpRequest(Executors.newCachedThreadPool(),
* new MultiThreadedHttpConnectionManager());
- * CompletionService<GetMethod> completionService = mget.execute(urls);
+ * CompletionService<GetMethod> completionService = mhr.execute(urls,
headers, timeoutMs);
* for (int i = 0; i < urls.size(); i++) {
* GetMethod getMethod = null;
* try {
@@ -72,8 +72,8 @@ import org.slf4j.LoggerFactory;
* }
* </pre>
*/
-public class MultiGetRequest {
- private static final Logger LOGGER =
LoggerFactory.getLogger(MultiGetRequest.class);
+public class MultiHttpRequest {
+ private static final Logger LOGGER =
LoggerFactory.getLogger(MultiHttpRequest.class);
private final Executor _executor;
// TODO: Verify that _connectionManager is an instaceOf
MultithreadedHttpConnectionManager.
@@ -84,7 +84,7 @@ public class MultiGetRequest {
* @param executor executor service to use for making parallel requests
* @param connectionManager http connection manager to use.
*/
- public MultiGetRequest(Executor executor, HttpConnectionManager
connectionManager) {
+ public MultiHttpRequest(Executor executor, HttpConnectionManager
connectionManager) {
_executor = executor;
_connectionManager = connectionManager;
}
@@ -99,24 +99,42 @@ public class MultiGetRequest {
*/
public CompletionService<GetMethod> execute(List<String> urls, @Nullable
Map<String, String> requestHeaders,
int timeoutMs) {
+ return execute(urls, requestHeaders, timeoutMs, "GET", GetMethod::new);
+ }
+
+ /**
+ * Execute certain http method on the urls in parallel using the executor
service.
+ * @param urls absolute URLs to execute the http method
+ * @param requestHeaders headers to set when making the request
+ * @param timeoutMs timeout in milliseconds for each http request
+ * @param httpMethodName the name of the http method like GET, DELETE etc.
+ * @param httpMethodSupplier a function to create a new http method object.
+ * @return instance of CompletionService. Completion service will provide
+ * results as they arrive. The order is NOT same as the order of URLs
+ */
+ public <T extends HttpMethodBase> CompletionService<T> execute(List<String>
urls,
+ @Nullable Map<String, String> requestHeaders, int timeoutMs, String
httpMethodName,
+ Function<String, T> httpMethodSupplier) {
HttpClientParams clientParams = new HttpClientParams();
clientParams.setConnectionManagerTimeout(timeoutMs);
HttpClient client = new HttpClient(clientParams, _connectionManager);
- CompletionService<GetMethod> completionService = new
ExecutorCompletionService<>(_executor);
+ CompletionService<T> completionService = new
ExecutorCompletionService<>(_executor);
for (String url : urls) {
completionService.submit(() -> {
try {
- GetMethod getMethod = new GetMethod(url);
+ T httpMethod = httpMethodSupplier.apply(url);
+ // Explicitly cast type downwards to workaround a bug in jdk8:
https://bugs.openjdk.org/browse/JDK-8056984
+ HttpMethodBase httpMethodBase = httpMethod;
if (requestHeaders != null) {
- requestHeaders.forEach(getMethod::setRequestHeader);
+ requestHeaders.forEach((k, v) ->
httpMethodBase.setRequestHeader(k, v));
}
- getMethod.getParams().setSoTimeout(timeoutMs);
- client.executeMethod(getMethod);
- return getMethod;
+ httpMethodBase.getParams().setSoTimeout(timeoutMs);
+ client.executeMethod(httpMethodBase);
+ return httpMethod;
} catch (Exception e) {
// Log only exception type and message instead of the whole stack
trace
- LOGGER.warn("Caught '{}' while executing GET on URL: {}",
e.toString(), url);
+ LOGGER.warn("Caught '{}' while executing: {} on URL: {}", e,
httpMethodName, url);
throw e;
}
});
diff --git
a/pinot-common/src/main/java/org/apache/pinot/common/utils/config/InstanceUtils.java
b/pinot-common/src/main/java/org/apache/pinot/common/utils/config/InstanceUtils.java
index 41e88b5cf0..c840674390 100644
---
a/pinot-common/src/main/java/org/apache/pinot/common/utils/config/InstanceUtils.java
+++
b/pinot-common/src/main/java/org/apache/pinot/common/utils/config/InstanceUtils.java
@@ -26,6 +26,7 @@ import org.apache.commons.collections.MapUtils;
import org.apache.helix.model.InstanceConfig;
import org.apache.helix.zookeeper.datamodel.ZNRecord;
import org.apache.pinot.spi.config.instance.Instance;
+import org.apache.pinot.spi.utils.CommonConstants;
import org.apache.pinot.spi.utils.CommonConstants.Helix;
@@ -59,6 +60,31 @@ public class InstanceUtils {
return prefix + instance.getHost() + "_" + instance.getPort();
}
+ public static String getServerAdminEndpoint(InstanceConfig instanceConfig) {
+ // Backward-compatible with legacy hostname of format 'Server_<hostname>'
+ String hostname = instanceConfig.getHostName();
+ if (hostname.startsWith(Helix.PREFIX_OF_SERVER_INSTANCE)) {
+ hostname = hostname.substring(Helix.SERVER_INSTANCE_PREFIX_LENGTH);
+ }
+ return getServerAdminEndpoint(instanceConfig, hostname,
CommonConstants.HTTP_PROTOCOL);
+ }
+
+ public static String getServerAdminEndpoint(InstanceConfig instanceConfig,
String hostname, String defaultProtocol) {
+ String protocol = defaultProtocol;
+ int port = CommonConstants.Server.DEFAULT_ADMIN_API_PORT;
+ int adminPort =
instanceConfig.getRecord().getIntField(Helix.Instance.ADMIN_PORT_KEY, -1);
+ int adminHttpsPort =
instanceConfig.getRecord().getIntField(Helix.Instance.ADMIN_HTTPS_PORT_KEY, -1);
+ // NOTE: preference for insecure is sub-optimal, but required for
incremental upgrade scenarios
+ if (adminPort > 0) {
+ protocol = CommonConstants.HTTP_PROTOCOL;
+ port = adminPort;
+ } else if (adminHttpsPort > 0) {
+ protocol = CommonConstants.HTTPS_PROTOCOL;
+ port = adminHttpsPort;
+ }
+ return String.format("%s://%s:%d", protocol, hostname, port);
+ }
+
/**
* Returns the Helix InstanceConfig for the given instance.
*/
diff --git
a/pinot-common/src/test/java/org/apache/pinot/common/http/MultiGetRequestTest.java
b/pinot-common/src/test/java/org/apache/pinot/common/http/MultiHttpRequestTest.java
similarity index 96%
rename from
pinot-common/src/test/java/org/apache/pinot/common/http/MultiGetRequestTest.java
rename to
pinot-common/src/test/java/org/apache/pinot/common/http/MultiHttpRequestTest.java
index 8cfd5a4cfe..95247eca13 100644
---
a/pinot-common/src/test/java/org/apache/pinot/common/http/MultiGetRequestTest.java
+++
b/pinot-common/src/test/java/org/apache/pinot/common/http/MultiHttpRequestTest.java
@@ -42,8 +42,8 @@ import org.testng.annotations.BeforeTest;
import org.testng.annotations.Test;
-public class MultiGetRequestTest {
- private static final Logger LOGGER =
LoggerFactory.getLogger(MultiGetRequest.class);
+public class MultiHttpRequestTest {
+ private static final Logger LOGGER =
LoggerFactory.getLogger(MultiHttpRequest.class);
private static final String SUCCESS_MSG = "success";
private static final String ERROR_MSG = "error";
private static final String TIMEOUT_MSG = "Timeout";
@@ -106,8 +106,8 @@ public class MultiGetRequestTest {
@Test
public void testMultiGet() {
- MultiGetRequest mget =
- new MultiGetRequest(Executors.newCachedThreadPool(), new
MultiThreadedHttpConnectionManager());
+ MultiHttpRequest mget =
+ new MultiHttpRequest(Executors.newCachedThreadPool(), new
MultiThreadedHttpConnectionManager());
List<String> urls = Arrays.asList("http://localhost:" +
String.valueOf(_portStart) + URI_PATH,
"http://localhost:" + String.valueOf(_portStart + 1) + URI_PATH,
"http://localhost:" + String.valueOf(_portStart + 2) + URI_PATH,
diff --git
a/pinot-controller/src/main/java/org/apache/pinot/controller/BaseControllerStarter.java
b/pinot-controller/src/main/java/org/apache/pinot/controller/BaseControllerStarter.java
index b52e86197d..9f4a35eb3c 100644
---
a/pinot-controller/src/main/java/org/apache/pinot/controller/BaseControllerStarter.java
+++
b/pinot-controller/src/main/java/org/apache/pinot/controller/BaseControllerStarter.java
@@ -207,8 +207,9 @@ public abstract class BaseControllerStarter implements
ServiceStartable {
// Do not use this before the invocation of {@link
PinotHelixResourceManager::start()}, which happens in {@link
// ControllerStarter::start()}
_helixResourceManager = new PinotHelixResourceManager(_config);
+ // This executor service is used to do async tasks from multiget util or
table rebalancing.
_executorService =
- Executors.newCachedThreadPool(new
ThreadFactoryBuilder().setNameFormat("restapi-multiget-thread-%d").build());
+ Executors.newCachedThreadPool(new
ThreadFactoryBuilder().setNameFormat("async-task-thread-%d").build());
}
// Initialize the table config tuner registry.
diff --git
a/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/PinotHelixResourceManager.java
b/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/PinotHelixResourceManager.java
index 4eba9a8d5b..9d061d290a 100644
---
a/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/PinotHelixResourceManager.java
+++
b/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/PinotHelixResourceManager.java
@@ -158,7 +158,6 @@ import org.apache.pinot.spi.utils.CommonConstants;
import org.apache.pinot.spi.utils.CommonConstants.Helix;
import
org.apache.pinot.spi.utils.CommonConstants.Helix.StateModel.BrokerResourceStateModel;
import
org.apache.pinot.spi.utils.CommonConstants.Helix.StateModel.SegmentStateModel;
-import org.apache.pinot.spi.utils.CommonConstants.Server;
import org.apache.pinot.spi.utils.IngestionConfigUtils;
import org.apache.pinot.spi.utils.InstanceTypeUtils;
import org.apache.pinot.spi.utils.TimeUtils;
@@ -226,28 +225,7 @@ public class PinotHelixResourceManager {
public String load(String instanceId) {
InstanceConfig instanceConfig =
getHelixInstanceConfig(instanceId);
Preconditions.checkNotNull(instanceConfig, "Failed to find
instance config for: %s", instanceId);
- // Backward-compatible with legacy hostname of format
'Server_<hostname>'
- String hostname = instanceConfig.getHostName();
- if (hostname.startsWith(Helix.PREFIX_OF_SERVER_INSTANCE)) {
- hostname =
hostname.substring(Helix.SERVER_INSTANCE_PREFIX_LENGTH);
- }
-
- String protocol = CommonConstants.HTTP_PROTOCOL;
- int port = Server.DEFAULT_ADMIN_API_PORT;
-
- int adminPort =
instanceConfig.getRecord().getIntField(Helix.Instance.ADMIN_PORT_KEY, -1);
- int adminHttpsPort =
instanceConfig.getRecord().getIntField(Helix.Instance.ADMIN_HTTPS_PORT_KEY, -1);
-
- // NOTE: preference for insecure is sub-optimal, but required
for incremental upgrade scenarios
- if (adminPort > 0) {
- protocol = CommonConstants.HTTP_PROTOCOL;
- port = adminPort;
- } else if (adminHttpsPort > 0) {
- protocol = CommonConstants.HTTPS_PROTOCOL;
- port = adminHttpsPort;
- }
-
- return String.format("%s://%s:%d", protocol, hostname, port);
+ return InstanceUtils.getServerAdminEndpoint(instanceConfig);
}
});
_tableUpdaterLocks = new Object[DEFAULT_TABLE_UPDATER_LOCKERS_SIZE];
diff --git
a/pinot-controller/src/main/java/org/apache/pinot/controller/util/CompletionServiceHelper.java
b/pinot-controller/src/main/java/org/apache/pinot/controller/util/CompletionServiceHelper.java
index 4a1bbb2623..8ec3b28238 100644
---
a/pinot-controller/src/main/java/org/apache/pinot/controller/util/CompletionServiceHelper.java
+++
b/pinot-controller/src/main/java/org/apache/pinot/controller/util/CompletionServiceHelper.java
@@ -29,7 +29,7 @@ import javax.annotation.Nullable;
import org.apache.commons.httpclient.HttpConnectionManager;
import org.apache.commons.httpclient.URI;
import org.apache.commons.httpclient.methods.GetMethod;
-import org.apache.pinot.common.http.MultiGetRequest;
+import org.apache.pinot.common.http.MultiHttpRequest;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -79,7 +79,7 @@ public class CompletionServiceHelper {
// TODO: use some service other than completion service so that we know
which server encounters the error
CompletionService<GetMethod> completionService =
- new MultiGetRequest(_executor,
_httpConnectionManager).execute(serverURLs, requestHeaders, timeoutMs);
+ new MultiHttpRequest(_executor,
_httpConnectionManager).execute(serverURLs, requestHeaders, timeoutMs);
for (int i = 0; i < serverURLs.size(); i++) {
GetMethod getMethod = null;
try {
diff --git
a/pinot-core/src/main/java/org/apache/pinot/core/query/request/ServerQueryRequest.java
b/pinot-core/src/main/java/org/apache/pinot/core/query/request/ServerQueryRequest.java
index 09bfa4ad2f..c272c7ffed 100644
---
a/pinot-core/src/main/java/org/apache/pinot/core/query/request/ServerQueryRequest.java
+++
b/pinot-core/src/main/java/org/apache/pinot/core/query/request/ServerQueryRequest.java
@@ -87,6 +87,14 @@ public class ServerQueryRequest {
_timerContext = new TimerContext(_queryContext.getTableName(),
serverMetrics, queryArrivalTimeMs);
}
+ /**
+ * As _requestId can be same across brokers, so use _brokerId and _requestId
together to uniquely identify a query.
+ * @return unique query Id within a pinot cluster.
+ */
+ public String getQueryId() {
+ return _brokerId + "_" + _requestId;
+ }
+
public long getRequestId() {
return _requestId;
}
diff --git
a/pinot-core/src/main/java/org/apache/pinot/core/query/scheduler/QueryScheduler.java
b/pinot-core/src/main/java/org/apache/pinot/core/query/scheduler/QueryScheduler.java
index 069a84337c..f55cdb35f9 100644
---
a/pinot-core/src/main/java/org/apache/pinot/core/query/scheduler/QueryScheduler.java
+++
b/pinot-core/src/main/java/org/apache/pinot/core/query/scheduler/QueryScheduler.java
@@ -19,12 +19,18 @@
package org.apache.pinot.core.query.scheduler;
import com.google.common.base.Preconditions;
+import com.google.common.util.concurrent.FutureCallback;
import com.google.common.util.concurrent.Futures;
import com.google.common.util.concurrent.ListenableFuture;
import com.google.common.util.concurrent.ListenableFutureTask;
+import com.google.common.util.concurrent.MoreExecutors;
import com.google.common.util.concurrent.RateLimiter;
+import java.util.HashSet;
import java.util.Map;
+import java.util.Set;
+import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Future;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.LongAccumulator;
@@ -62,6 +68,7 @@ public abstract class QueryScheduler {
private static final String INVALID_NUM_RESIZES = "-1";
private static final String INVALID_RESIZE_TIME_MS = "-1";
private static final String QUERY_LOG_MAX_RATE_KEY =
"query.log.maxRatePerSecond";
+ private static final String ENABLE_QUERY_CANCELLATION_KEY =
"enable.query.cancellation";
private static final double DEFAULT_QUERY_LOG_MAX_RATE = 10_000d;
protected final ServerMetrics _serverMetrics;
protected final QueryExecutor _queryExecutor;
@@ -70,8 +77,9 @@ public abstract class QueryScheduler {
private final RateLimiter _queryLogRateLimiter;
private final RateLimiter _numDroppedLogRateLimiter;
private final AtomicInteger _numDroppedLogCounter;
+ private final boolean _enableQueryCancellation;
protected volatile boolean _isRunning = false;
-
+ private final Map<String, Future<byte[]>> _queryFuturesById = new
ConcurrentHashMap<>();
/**
* Constructor to initialize QueryScheduler
* @param queryExecutor QueryExecutor engine to use
@@ -93,8 +101,12 @@ public abstract class QueryScheduler {
_queryLogRateLimiter =
RateLimiter.create(config.getProperty(QUERY_LOG_MAX_RATE_KEY,
DEFAULT_QUERY_LOG_MAX_RATE));
_numDroppedLogRateLimiter = RateLimiter.create(1.0d);
_numDroppedLogCounter = new AtomicInteger(0);
-
LOGGER.info("Query log max rate: {}", _queryLogRateLimiter.getRate());
+
+ _enableQueryCancellation =
Boolean.parseBoolean(config.getProperty(ENABLE_QUERY_CANCELLATION_KEY));
+ if (_enableQueryCancellation) {
+ LOGGER.info("Enable query cancellation");
+ }
}
/**
@@ -105,6 +117,76 @@ public abstract class QueryScheduler {
*/
public abstract ListenableFuture<byte[]> submit(ServerQueryRequest
queryRequest);
+ /**
+ * Submit a query for execution and track runtime context about the query
for things like cancellation.
+ * @param queryRequest query to schedule for execution
+ * @return Listenable future for query result representing serialized
response. Custom callbacks can be added on
+ * the future to clean up the runtime context tracked during query execution.
+ */
+ public ListenableFuture<byte[]> submitQuery(ServerQueryRequest queryRequest)
{
+ ListenableFuture<byte[]> future = submit(queryRequest);
+ if (_enableQueryCancellation) {
+ String queryId = queryRequest.getQueryId();
+ // Track the running query for cancellation.
+ if (LOGGER.isDebugEnabled()) {
+ LOGGER.debug("Keep track of running query: {}", queryId);
+ }
+ _queryFuturesById.put(queryId, future);
+ // And remove the track when the query ends.
+ Futures.addCallback(future, new FutureCallback<byte[]>() {
+ @Override
+ public void onSuccess(@Nullable byte[] ignored) {
+ if (LOGGER.isDebugEnabled()) {
+ LOGGER.debug("Remove track of running query: {} on success",
queryId);
+ }
+ _queryFuturesById.remove(queryId);
+ }
+
+ @Override
+ public void onFailure(Throwable ignored) {
+ if (LOGGER.isDebugEnabled()) {
+ LOGGER.debug("Remove track of running query: {} on failure",
queryId);
+ }
+ _queryFuturesById.remove(queryId);
+ }
+ }, MoreExecutors.directExecutor());
+ }
+ return future;
+ }
+
+ /**
+ * Cancel a query as identified by the queryId. This method is non-blocking
and the query may still run for a while
+ * after calling this method. This method can be called multiple times.
+ * TODO: refine the errmsg when query is cancelled, instead of bubbling up
the executor's CancellationException.
+ *
+ * @param queryId a unique Id to find the query
+ * @return true if a running query exists for the given queryId.
+ */
+ public boolean cancelQuery(String queryId) {
+ Preconditions.checkArgument(_enableQueryCancellation, "Query cancellation
is not enabled on server");
+ // Keep the future as it'll be cleaned up by the thread executing the
query.
+ Future<byte[]> future = _queryFuturesById.get(queryId);
+ if (future == null) {
+ return false;
+ }
+ boolean done = future.isDone();
+ if (!done) {
+ future.cancel(true);
+ }
+ if (LOGGER.isDebugEnabled()) {
+ LOGGER.debug("Cancelled query: {} that's done: {}", queryId, done);
+ }
+ return true;
+ }
+
+ /**
+ * @return list of ids of the queries currently running on the server.
+ */
+ public Set<String> getRunningQueryIds() {
+ Preconditions.checkArgument(_enableQueryCancellation, "Query cancellation
is not enabled on server");
+ return new HashSet<>(_queryFuturesById.keySet());
+ }
+
/**
* Query scheduler name for logging
*/
diff --git
a/pinot-core/src/main/java/org/apache/pinot/core/transport/InstanceRequestHandler.java
b/pinot-core/src/main/java/org/apache/pinot/core/transport/InstanceRequestHandler.java
index af841eea5b..31d0c8d536 100644
---
a/pinot-core/src/main/java/org/apache/pinot/core/transport/InstanceRequestHandler.java
+++
b/pinot-core/src/main/java/org/apache/pinot/core/transport/InstanceRequestHandler.java
@@ -120,7 +120,7 @@ public class InstanceRequestHandler extends
SimpleChannelInboundHandler<ByteBuf>
tableNameWithType = queryRequest.getTableNameWithType();
// Submit query for execution and register callback for execution
results.
- Futures.addCallback(_queryScheduler.submit(queryRequest),
+ Futures.addCallback(_queryScheduler.submitQuery(queryRequest),
createCallback(ctx, tableNameWithType, queryArrivalTimeMs,
instanceRequest, queryRequest),
MoreExecutors.directExecutor());
} catch (Exception e) {
diff --git
a/pinot-core/src/main/java/org/apache/pinot/core/transport/ServerInstance.java
b/pinot-core/src/main/java/org/apache/pinot/core/transport/ServerInstance.java
index 149c1b0559..8809e4a015 100644
---
a/pinot-core/src/main/java/org/apache/pinot/core/transport/ServerInstance.java
+++
b/pinot-core/src/main/java/org/apache/pinot/core/transport/ServerInstance.java
@@ -22,7 +22,9 @@ import com.google.common.annotations.VisibleForTesting;
import com.google.common.base.Preconditions;
import org.apache.commons.lang.StringUtils;
import org.apache.helix.model.InstanceConfig;
+import org.apache.pinot.common.utils.config.InstanceUtils;
import org.apache.pinot.spi.config.table.TableType;
+import org.apache.pinot.spi.utils.CommonConstants;
import org.apache.pinot.spi.utils.CommonConstants.Helix;
@@ -43,6 +45,7 @@ public class ServerInstance {
private final int _queryServicePort;
private final int _queryMailboxPort;
+ private final String _adminEndpoint;
/**
* By default (auto joined instances), server instance name is of format:
{@code Server_<hostname>_<port>}, e.g.
@@ -75,6 +78,7 @@ public class ServerInstance {
INVALID_PORT);
_queryMailboxPort =
instanceConfig.getRecord().getIntField(Helix.Instance.MULTI_STAGE_QUERY_ENGINE_MAILBOX_PORT_KEY,
INVALID_PORT);
+ _adminEndpoint = InstanceUtils.getServerAdminEndpoint(instanceConfig,
_hostname, CommonConstants.HTTP_PROTOCOL);
}
@VisibleForTesting
@@ -86,6 +90,7 @@ public class ServerInstance {
_nettyTlsPort = INVALID_PORT;
_queryServicePort = INVALID_PORT;
_queryMailboxPort = INVALID_PORT;
+ _adminEndpoint = null;
}
public String getInstanceId() {
@@ -100,6 +105,10 @@ public class ServerInstance {
return _port;
}
+ public String getAdminEndpoint() {
+ return _adminEndpoint;
+ }
+
public int getGrpcPort() {
return _grpcPort;
}
diff --git
a/pinot-core/src/test/java/org/apache/pinot/core/query/scheduler/QuerySchedulerTest.java
b/pinot-core/src/test/java/org/apache/pinot/core/query/scheduler/QuerySchedulerTest.java
new file mode 100644
index 0000000000..b2bf14a1f2
--- /dev/null
+++
b/pinot-core/src/test/java/org/apache/pinot/core/query/scheduler/QuerySchedulerTest.java
@@ -0,0 +1,76 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pinot.core.query.scheduler;
+
+import com.google.common.util.concurrent.ListenableFuture;
+import com.google.common.util.concurrent.ListenableFutureTask;
+import java.util.HashSet;
+import java.util.Set;
+import java.util.concurrent.atomic.LongAccumulator;
+import org.apache.pinot.common.metrics.ServerMetrics;
+import org.apache.pinot.core.query.executor.QueryExecutor;
+import org.apache.pinot.core.query.request.ServerQueryRequest;
+import org.apache.pinot.core.query.scheduler.resources.ResourceManager;
+import org.apache.pinot.spi.env.PinotConfiguration;
+import org.testng.Assert;
+import org.testng.annotations.Test;
+
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.when;
+
+
+public class QuerySchedulerTest {
+ @Test
+ public void testCancelQuery() {
+ PinotConfiguration config = new PinotConfiguration();
+ config.setProperty("enable.query.cancellation", "true");
+ QueryScheduler qs = createQueryScheduler(config);
+ Set<String> queryIds = new HashSet<>();
+ queryIds.add("foo");
+ queryIds.add("bar");
+ queryIds.add("baz");
+ for (String id : queryIds) {
+ ServerQueryRequest query = mock(ServerQueryRequest.class);
+ when(query.getQueryId()).thenReturn(id);
+ qs.submitQuery(query);
+ }
+ Assert.assertEquals(qs.getRunningQueryIds(), queryIds);
+ for (String id : queryIds) {
+ qs.cancelQuery(id);
+ }
+ Assert.assertTrue(qs.getRunningQueryIds().isEmpty());
+ Assert.assertFalse(qs.cancelQuery("unknown"));
+ }
+
+ private QueryScheduler createQueryScheduler(PinotConfiguration config) {
+ return new QueryScheduler(config, mock(QueryExecutor.class),
mock(ResourceManager.class), mock(ServerMetrics.class),
+ new LongAccumulator(Long::max, 0)) {
+ @Override
+ public ListenableFuture<byte[]> submit(ServerQueryRequest queryRequest) {
+ // Create a FutureTask does nothing but waits to be cancelled and
trigger callbacks.
+ return ListenableFutureTask.create(() -> null);
+ }
+
+ @Override
+ public String name() {
+ return "noop";
+ }
+ };
+ }
+}
diff --git
a/pinot-core/src/test/java/org/apache/pinot/core/transport/QueryRoutingTest.java
b/pinot-core/src/test/java/org/apache/pinot/core/transport/QueryRoutingTest.java
index 01191ce79d..618223cf39 100644
---
a/pinot-core/src/test/java/org/apache/pinot/core/transport/QueryRoutingTest.java
+++
b/pinot-core/src/test/java/org/apache/pinot/core/transport/QueryRoutingTest.java
@@ -70,7 +70,7 @@ public class QueryRoutingTest {
private QueryScheduler mockQueryScheduler(int responseDelayMs, byte[]
responseBytes) {
QueryScheduler queryScheduler = mock(QueryScheduler.class);
- when(queryScheduler.submit(any())).thenAnswer(invocation -> {
+ when(queryScheduler.submitQuery(any())).thenAnswer(invocation -> {
Thread.sleep(responseDelayMs);
return Futures.immediateFuture(responseBytes);
});
diff --git
a/pinot-server/src/main/java/org/apache/pinot/server/api/resources/QueryResource.java
b/pinot-server/src/main/java/org/apache/pinot/server/api/resources/QueryResource.java
new file mode 100644
index 0000000000..b6c83ca94b
--- /dev/null
+++
b/pinot-server/src/main/java/org/apache/pinot/server/api/resources/QueryResource.java
@@ -0,0 +1,100 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pinot.server.api.resources;
+
+import io.swagger.annotations.Api;
+import io.swagger.annotations.ApiKeyAuthDefinition;
+import io.swagger.annotations.ApiOperation;
+import io.swagger.annotations.ApiParam;
+import io.swagger.annotations.ApiResponse;
+import io.swagger.annotations.ApiResponses;
+import io.swagger.annotations.Authorization;
+import io.swagger.annotations.SecurityDefinition;
+import io.swagger.annotations.SwaggerDefinition;
+import java.util.Set;
+import javax.inject.Inject;
+import javax.ws.rs.DELETE;
+import javax.ws.rs.GET;
+import javax.ws.rs.Path;
+import javax.ws.rs.PathParam;
+import javax.ws.rs.Produces;
+import javax.ws.rs.WebApplicationException;
+import javax.ws.rs.core.HttpHeaders;
+import javax.ws.rs.core.MediaType;
+import javax.ws.rs.core.Response;
+import org.apache.pinot.server.starter.ServerInstance;
+
+import static
org.apache.pinot.spi.utils.CommonConstants.SWAGGER_AUTHORIZATION_KEY;
+
+
+/**
+ * API to cancel query running on the server, given a queryId.
+ */
+@Api(tags = "Query", authorizations = {@Authorization(value =
SWAGGER_AUTHORIZATION_KEY)})
+@SwaggerDefinition(securityDefinition =
@SecurityDefinition(apiKeyAuthDefinitions = @ApiKeyAuthDefinition(name =
+ HttpHeaders.AUTHORIZATION, in =
ApiKeyAuthDefinition.ApiKeyLocation.HEADER, key = SWAGGER_AUTHORIZATION_KEY)))
+@Path("/")
+public class QueryResource {
+ @Inject
+ private ServerInstance _serverInstance;
+
+ @DELETE
+ @Path("/query/{queryId}")
+ @Produces(MediaType.APPLICATION_JSON)
+ @ApiOperation(value = "Cancel a query running on the server as identified by
the queryId", notes = "No effect if "
+ + "no query exists for the given queryId. Query may continue to run for
a short while after calling cancel as "
+ + "it's done in a non-blocking manner. The cancel API can be called
multiple times.")
+ @ApiResponses(value = {
+ @ApiResponse(code = 200, message = "Success"), @ApiResponse(code = 500,
message = "Internal server error"),
+ @ApiResponse(code = 404, message = "Query not found running on the
server")
+ })
+ public String cancelQuery(
+ @ApiParam(value = "QueryId as in the format of <brokerId>_<requestId>",
required = true) @PathParam("queryId")
+ String queryId) {
+ try {
+ if (_serverInstance.getQueryScheduler().cancelQuery(queryId)) {
+ return "Cancelled query: " + queryId;
+ }
+ } catch (Exception e) {
+ throw new
WebApplicationException(Response.status(Response.Status.INTERNAL_SERVER_ERROR)
+ .entity(String.format("Failed to cancel query: %s on the server due
to error: %s", queryId, e.getMessage()))
+ .build());
+ }
+ throw new WebApplicationException(
+
Response.status(Response.Status.NOT_FOUND).entity(String.format("Query: %s not
found on the server", queryId))
+ .build());
+ }
+
+ @GET
+ @Path("/queries/id")
+ @Produces(MediaType.APPLICATION_JSON)
+ @ApiOperation(value = "Get queryIds of running queries on the server", notes
= "QueryIds are in the format of "
+ + "<brokerId>_<requestId>")
+ @ApiResponses(value = {
+ @ApiResponse(code = 200, message = "Success"), @ApiResponse(code = 500,
message = "Internal server error")
+ })
+ public Set<String> getRunningQueryIds() {
+ try {
+ return _serverInstance.getQueryScheduler().getRunningQueryIds();
+ } catch (Exception e) {
+ throw new
WebApplicationException(Response.status(Response.Status.INTERNAL_SERVER_ERROR)
+ .entity("Failed to get queryIds of running queries on the server due
to error: " + e.getMessage()).build());
+ }
+ }
+}
diff --git
a/pinot-server/src/main/java/org/apache/pinot/server/starter/ServerInstance.java
b/pinot-server/src/main/java/org/apache/pinot/server/starter/ServerInstance.java
index 7ed0a57989..eff4a1cc05 100644
---
a/pinot-server/src/main/java/org/apache/pinot/server/starter/ServerInstance.java
+++
b/pinot-server/src/main/java/org/apache/pinot/server/starter/ServerInstance.java
@@ -258,4 +258,8 @@ public class ServerInstance {
public long getLatestQueryTime() {
return _latestQueryTime.get();
}
+
+ public QueryScheduler getQueryScheduler() {
+ return _queryScheduler;
+ }
}
diff --git
a/pinot-spi/src/main/java/org/apache/pinot/spi/utils/CommonConstants.java
b/pinot-spi/src/main/java/org/apache/pinot/spi/utils/CommonConstants.java
index c24702b0dc..ec2719b47e 100644
--- a/pinot-spi/src/main/java/org/apache/pinot/spi/utils/CommonConstants.java
+++ b/pinot-spi/src/main/java/org/apache/pinot/spi/utils/CommonConstants.java
@@ -205,6 +205,7 @@ public class CommonConstants {
public static final int DEFAULT_BROKER_QUERY_LOG_LENGTH =
Integer.MAX_VALUE;
public static final String CONFIG_OF_BROKER_QUERY_LOG_MAX_RATE_PER_SECOND =
"pinot.broker.query.log.maxRatePerSecond";
+ public static final String CONFIG_OF_BROKER_ENABLE_QUERY_CANCELLATION =
"pinot.broker.enable.query.cancellation";
public static final double DEFAULT_BROKER_QUERY_LOG_MAX_RATE_PER_SECOND =
10_000d;
public static final String CONFIG_OF_BROKER_TIMEOUT_MS =
"pinot.broker.timeoutMs";
public static final long DEFAULT_BROKER_TIMEOUT_MS = 10_000L;
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]