This is an automated email from the ASF dual-hosted git repository. jlli pushed a commit to branch test-async-query-execution in repository https://gitbox.apache.org/repos/asf/pinot.git
commit fb35321475a30612d1d346ee5ea500d65df2a14e Author: Jack Li(Analytics Engineering) <[email protected]> AuthorDate: Fri Nov 4 16:39:28 2022 -0700 Add sample code to show how pagination protocol works in broker code --- .../requesthandler/BaseBrokerRequestHandler.java | 129 +++++++++++++++------ .../tests/BaseClusterIntegrationTest.java | 2 +- 2 files changed, 97 insertions(+), 34 deletions(-) diff --git a/pinot-broker/src/main/java/org/apache/pinot/broker/requesthandler/BaseBrokerRequestHandler.java b/pinot-broker/src/main/java/org/apache/pinot/broker/requesthandler/BaseBrokerRequestHandler.java index 224f46c440..2b3a175167 100644 --- a/pinot-broker/src/main/java/org/apache/pinot/broker/requesthandler/BaseBrokerRequestHandler.java +++ b/pinot-broker/src/main/java/org/apache/pinot/broker/requesthandler/BaseBrokerRequestHandler.java @@ -21,6 +21,7 @@ package org.apache.pinot.broker.requesthandler; import com.fasterxml.jackson.databind.JsonNode; import com.google.common.annotations.VisibleForTesting; import com.google.common.base.Preconditions; +import java.io.IOException; import java.net.InetAddress; import java.util.ArrayList; import java.util.Arrays; @@ -30,6 +31,8 @@ import java.util.HashSet; import java.util.List; import java.util.Map; import java.util.Set; +import java.util.concurrent.CompletableFuture; +import java.util.concurrent.CompletionException; import java.util.concurrent.CompletionService; import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.Executor; @@ -273,6 +276,7 @@ public abstract class BaseBrokerRequestHandler implements BrokerRequestHandler { PinotQuery pinotQuery; try { // Parse the request + // TODO: consider adding the option from subquery if sqlNodeAndOptions isn't null? sqlNodeAndOptions = sqlNodeAndOptions != null ? sqlNodeAndOptions : RequestUtils.parseQuery(query, request); // Compile the request into PinotQuery compilationStartTimeNs = System.nanoTime(); @@ -356,6 +360,7 @@ public abstract class BaseBrokerRequestHandler implements BrokerRequestHandler { _brokerMetrics.addPhaseTiming(rawTableName, BrokerQueryPhase.REQUEST_COMPILATION, (compilationEndTimeNs - compilationStartTimeNs) + sqlNodeAndOptions.getParseTimeNs()); + // TODO: go through all the tables for authorization. // Second-stage table-level access control // TODO: Modify AccessControl interface to directly take PinotQuery BrokerRequest brokerRequest = CalciteSqlCompiler.convertToBrokerRequest(pinotQuery); @@ -371,6 +376,86 @@ public abstract class BaseBrokerRequestHandler implements BrokerRequestHandler { _brokerMetrics.addPhaseTiming(rawTableName, BrokerQueryPhase.AUTHORIZATION, System.nanoTime() - compilationEndTimeNs); + // Validate QPS quota + if (!_queryQuotaManager.acquire(tableName)) { + String errorMessage = + String.format("Request %d: %s exceeds query quota for table: %s", requestId, query, tableName); + LOGGER.info(errorMessage); + requestContext.setErrorCode(QueryException.TOO_MANY_REQUESTS_ERROR_CODE); + _brokerMetrics.addMeteredTableValue(rawTableName, BrokerMeter.QUERY_QUOTA_EXCEEDED, 1); + return new BrokerResponseNative(QueryException.getException(QueryException.QUOTA_EXCEEDED_ERROR, errorMessage)); + } + + // Validate the request + try { + validateRequest(serverPinotQuery, _queryResponseLimit); + } catch (Exception e) { + LOGGER.info("Caught exception while validating request {}: {}, {}", requestId, query, e.getMessage()); + requestContext.setErrorCode(QueryException.QUERY_VALIDATION_ERROR_CODE); + _brokerMetrics.addMeteredTableValue(rawTableName, BrokerMeter.QUERY_VALIDATION_EXCEPTIONS, 1); + return new BrokerResponseNative(QueryException.getException(QueryException.QUERY_VALIDATION_ERROR, e)); + } + + _brokerMetrics.addMeteredTableValue(rawTableName, BrokerMeter.QUERIES, 1); + _brokerMetrics.addValueToTableGauge(rawTableName, BrokerGauge.REQUEST_SIZE, query.length()); + + if (Boolean.parseBoolean(pinotQuery.getQueryOptions().get("pagination"))) { +// String tableName = TableNameBuilder.extractRawTableName(pinotQuery.getDataSource().getTableName()); + // Step 1: Generate a pointer. + // TODO: a. add a method to generate a ID + // b. replace the dummyInstanceId with a real one. +// int hash = ("dummyInstanceId" + requestId + System.currentTimeMillis()).hashCode(); + String pointer = rawTableName + "_" + "dummyInstanceId" + requestId + System.currentTimeMillis(); + + // Step 2: TODO invoke pagination query initialization API. + + // Step 3: Submit to query executor. + // TODO: use an pool based executor as the 2nd parameter below. + CompletableFuture.supplyAsync(() -> { + try { + return handleRequest(requestId, query, serverPinotQuery, brokerRequest, serverBrokerRequest, + compilationStartTimeNs, tableName, rawTableName, requesterIdentity, requestContext); + } catch (Exception e) { + throw new CompletionException(e); + } + }).thenApply(brokerResponseNative -> { + // Step 5: TODO invoke upload result API. + try { + System.out.println("Async query execution response: " + brokerResponseNative.toJsonString()); + return null; + } catch (IOException e) { + throw new CompletionException(e); + } + }).exceptionally(exception -> { + // Step 6: TODO Handle exception. + System.out.println(exception.getMessage()); + return null; + }); + + // Step 4: TODO Put pointer only to the response and return. + BrokerResponseNative brokerResponseNative = new BrokerResponseNative(); + + DataSchema dataSchema = + new DataSchema(new String[]{"pointer"}, new DataSchema.ColumnDataType[]{DataSchema.ColumnDataType.STRING}); + Object[] objects = new Object[]{pointer}; + List<Object[]> rows = new ArrayList<>(); + rows.add(objects); + ResultTable resultTable = new ResultTable(dataSchema, rows); + brokerResponseNative.setResultTable(resultTable); + System.out.println("Submission response: " + brokerResponseNative.toJsonString()); + return brokerResponseNative; + } + + return handleRequest(requestId, query, serverPinotQuery, brokerRequest, serverBrokerRequest, compilationStartTimeNs, + tableName, rawTableName, requesterIdentity, requestContext); + } + + // Exclude query compilation + authorization. + private BrokerResponseNative handleRequest(long requestId, String query, PinotQuery serverPinotQuery, + BrokerRequest brokerRequest, BrokerRequest serverBrokerRequest, long compilationStartTimeNs, String tableName, + String rawTableName, @Nullable RequesterIdentity requesterIdentity, RequestContext requestContext) + throws Exception { + // Get the tables hit by the request String offlineTableName = null; String realtimeTableName = null; @@ -430,29 +515,6 @@ public abstract class BaseBrokerRequestHandler implements BrokerRequestHandler { handleApproximateFunctionOverride(serverPinotQuery); } - // Validate QPS quota - if (!_queryQuotaManager.acquire(tableName)) { - String errorMessage = - String.format("Request %d: %s exceeds query quota for table: %s", requestId, query, tableName); - LOGGER.info(errorMessage); - requestContext.setErrorCode(QueryException.TOO_MANY_REQUESTS_ERROR_CODE); - _brokerMetrics.addMeteredTableValue(rawTableName, BrokerMeter.QUERY_QUOTA_EXCEEDED, 1); - return new BrokerResponseNative(QueryException.getException(QueryException.QUOTA_EXCEEDED_ERROR, errorMessage)); - } - - // Validate the request - try { - validateRequest(serverPinotQuery, _queryResponseLimit); - } catch (Exception e) { - LOGGER.info("Caught exception while validating request {}: {}, {}", requestId, query, e.getMessage()); - requestContext.setErrorCode(QueryException.QUERY_VALIDATION_ERROR_CODE); - _brokerMetrics.addMeteredTableValue(rawTableName, BrokerMeter.QUERY_VALIDATION_EXCEPTIONS, 1); - return new BrokerResponseNative(QueryException.getException(QueryException.QUERY_VALIDATION_ERROR, e)); - } - - _brokerMetrics.addMeteredTableValue(rawTableName, BrokerMeter.QUERIES, 1); - _brokerMetrics.addValueToTableGauge(rawTableName, BrokerGauge.REQUEST_SIZE, query.length()); - // Prepare OFFLINE and REALTIME requests BrokerRequest offlineBrokerRequest = null; BrokerRequest realtimeBrokerRequest = null; @@ -520,7 +582,7 @@ public abstract class BaseBrokerRequestHandler implements BrokerRequestHandler { } if (offlineBrokerRequest == null && realtimeBrokerRequest == null) { - if (pinotQuery.isExplain()) { + if (serverPinotQuery.isExplain()) { // EXPLAIN PLAN results to show that query is evaluated exclusively by Broker. return BrokerResponseNative.BROKER_ONLY_EXPLAIN_PLAN_OUTPUT; } @@ -528,8 +590,7 @@ public abstract class BaseBrokerRequestHandler implements BrokerRequestHandler { // Send empty response since we don't need to evaluate either offline or realtime request. BrokerResponseNative brokerResponse = BrokerResponseNative.empty(); // Extract source info from incoming request - _queryLogger.log(new QueryLogger.QueryLogParams( - requestId, query, requestContext, tableName, 0, new ServerStats(), + _queryLogger.log(new QueryLogger.QueryLogParams(requestId, query, requestContext, tableName, 0, new ServerStats(), brokerResponse, System.nanoTime(), requesterIdentity)); return brokerResponse; } @@ -640,7 +701,7 @@ public abstract class BaseBrokerRequestHandler implements BrokerRequestHandler { // - Compile time function invocation // - Literal only queries // - Any rewrites - if (pinotQuery.isExplain()) { + if (serverPinotQuery.isExplain()) { // Update routing tables to only send request to offline servers for OFFLINE and HYBRID tables. // TODO: Assess if the Explain Plan Query should also be routed to REALTIME servers for HYBRID tables if (offlineRoutingTable != null) { @@ -670,9 +731,9 @@ public abstract class BaseBrokerRequestHandler implements BrokerRequestHandler { LOGGER.debug("Remove track of running query: {}", requestId); } } else { - brokerResponse = processBrokerRequest(requestId, brokerRequest, serverBrokerRequest, offlineBrokerRequest, - offlineRoutingTable, realtimeBrokerRequest, realtimeRoutingTable, remainingTimeMs, serverStats, - requestContext); + brokerResponse = + processBrokerRequest(requestId, brokerRequest, serverBrokerRequest, offlineBrokerRequest, offlineRoutingTable, + realtimeBrokerRequest, realtimeRoutingTable, remainingTimeMs, serverStats, requestContext); } brokerResponse.setExceptions(exceptions); @@ -696,9 +757,8 @@ public abstract class BaseBrokerRequestHandler implements BrokerRequestHandler { // Extract source info from incoming request _queryLogger.log( - new QueryLogger.QueryLogParams( - requestId, query, requestContext, tableName, numUnavailableSegments, serverStats, brokerResponse, - totalTimeMs, requesterIdentity)); + new QueryLogger.QueryLogParams(requestId, query, requestContext, tableName, numUnavailableSegments, serverStats, + brokerResponse, totalTimeMs, requesterIdentity)); return brokerResponse; } @@ -812,6 +872,9 @@ public abstract class BaseBrokerRequestHandler implements BrokerRequestHandler { String subquery = subqueryLiteral.getStringValue(); BrokerResponseNative response = handleRequest(requestId, subquery, null, jsonRequest, requesterIdentity, requestContext); + +// handleRequest(requestId, subquery, null, brokerRequest, serverBrokerRequest, +// compilationStartTimeNs, tableName, rawTableName, requesterIdentity, requestContext); if (response.getExceptionsSize() != 0) { throw new RuntimeException("Caught exception while executing subquery: " + subquery); } diff --git a/pinot-integration-test-base/src/test/java/org/apache/pinot/integration/tests/BaseClusterIntegrationTest.java b/pinot-integration-test-base/src/test/java/org/apache/pinot/integration/tests/BaseClusterIntegrationTest.java index b133a27551..73f80d78cb 100644 --- a/pinot-integration-test-base/src/test/java/org/apache/pinot/integration/tests/BaseClusterIntegrationTest.java +++ b/pinot-integration-test-base/src/test/java/org/apache/pinot/integration/tests/BaseClusterIntegrationTest.java @@ -592,7 +592,7 @@ public abstract class BaseClusterIntegrationTest extends ClusterTest { } protected long getCurrentCountStarResult(String tableName) { - return getPinotConnection().execute("SELECT COUNT(*) FROM " + tableName).getResultSet(0).getLong(0); + return getPinotConnection().execute("SET pagination = true; SELECT COUNT(*) FROM " + tableName).getResultSet(0).getLong(0); } /** --------------------------------------------------------------------- To unsubscribe, e-mail: [email protected] For additional commands, e-mail: [email protected]
