This is an automated email from the ASF dual-hosted git repository.
siddteotia pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pinot.git
The following commit(s) were added to refs/heads/master by this push:
new 43e1298e32 allow automatic tracing when a request is sampled by a
registered tracer (#8629)
43e1298e32 is described below
commit 43e1298e322f2e66adb47c54b8bdfd2b7e57a552
Author: Richard Startin <[email protected]>
AuthorDate: Wed May 4 09:35:19 2022 +0200
allow automatic tracing when a request is sampled by a registered tracer
(#8629)
---
.../requesthandler/BaseBrokerRequestHandler.java | 95 +++++++++++-----------
.../requesthandler/BrokerRequestHandler.java | 4 +-
.../requesthandler/GrpcBrokerRequestHandler.java | 15 ++--
.../SingleConnectionBrokerRequestHandler.java | 10 ++-
.../LiteralOnlyBrokerRequestTest.java | 8 +-
...tStatistics.java => DefaultRequestContext.java} | 4 +-
...{RequestStatistics.java => RequestContext.java} | 6 +-
.../org/apache/pinot/spi/trace/RequestScope.java | 2 +-
.../java/org/apache/pinot/spi/trace/Tracer.java | 2 +-
9 files changed, 78 insertions(+), 68 deletions(-)
diff --git
a/pinot-broker/src/main/java/org/apache/pinot/broker/requesthandler/BaseBrokerRequestHandler.java
b/pinot-broker/src/main/java/org/apache/pinot/broker/requesthandler/BaseBrokerRequestHandler.java
index 5caa1c89aa..dedd9ebf0e 100644
---
a/pinot-broker/src/main/java/org/apache/pinot/broker/requesthandler/BaseBrokerRequestHandler.java
+++
b/pinot-broker/src/main/java/org/apache/pinot/broker/requesthandler/BaseBrokerRequestHandler.java
@@ -76,7 +76,7 @@ import
org.apache.pinot.spi.config.table.TimestampIndexGranularity;
import org.apache.pinot.spi.data.Schema;
import org.apache.pinot.spi.env.PinotConfiguration;
import org.apache.pinot.spi.exception.BadQueryRequestException;
-import org.apache.pinot.spi.trace.RequestStatistics;
+import org.apache.pinot.spi.trace.RequestContext;
import org.apache.pinot.spi.utils.BytesUtils;
import org.apache.pinot.spi.utils.CommonConstants;
import org.apache.pinot.spi.utils.CommonConstants.Broker;
@@ -165,12 +165,12 @@ public abstract class BaseBrokerRequestHandler implements
BrokerRequestHandler {
@Override
public BrokerResponseNative handleRequest(JsonNode request, @Nullable
RequesterIdentity requesterIdentity,
- RequestStatistics requestStatistics)
+ RequestContext requestContext)
throws Exception {
long requestId = _requestIdGenerator.incrementAndGet();
- requestStatistics.setBrokerId(_brokerId);
- requestStatistics.setRequestId(requestId);
- requestStatistics.setRequestArrivalTimeMillis(System.currentTimeMillis());
+ requestContext.setBrokerId(_brokerId);
+ requestContext.setRequestId(requestId);
+ requestContext.setRequestArrivalTimeMillis(System.currentTimeMillis());
// First-stage access control to prevent unauthenticated requests from
using up resources. Secondary table-level
// check comes later.
@@ -178,7 +178,7 @@ public abstract class BaseBrokerRequestHandler implements
BrokerRequestHandler {
if (!hasAccess) {
_brokerMetrics.addMeteredGlobalValue(BrokerMeter.REQUEST_DROPPED_DUE_TO_ACCESS_ERROR,
1);
LOGGER.info("Access denied for requestId {}", requestId);
- requestStatistics.setErrorCode(QueryException.ACCESS_DENIED_ERROR_CODE);
+ requestContext.setErrorCode(QueryException.ACCESS_DENIED_ERROR_CODE);
return new BrokerResponseNative(QueryException.ACCESS_DENIED_ERROR);
}
@@ -186,14 +186,14 @@ public abstract class BaseBrokerRequestHandler implements
BrokerRequestHandler {
if (sql == null) {
throw new BadQueryRequestException("Failed to find 'sql' in the request:
" + request);
}
- return handleRequest(requestId, sql.asText(), request, requesterIdentity,
requestStatistics);
+ return handleRequest(requestId, sql.asText(), request, requesterIdentity,
requestContext);
}
private BrokerResponseNative handleRequest(long requestId, String query,
JsonNode request,
- @Nullable RequesterIdentity requesterIdentity, RequestStatistics
requestStatistics)
+ @Nullable RequesterIdentity requesterIdentity, RequestContext
requestContext)
throws Exception {
LOGGER.debug("SQL query for request {}: {}", requestId, query);
- requestStatistics.setQuery(query);
+ requestContext.setQuery(query);
// Compile the request
long compilationStartTimeNs = System.nanoTime();
@@ -203,7 +203,7 @@ public abstract class BaseBrokerRequestHandler implements
BrokerRequestHandler {
} catch (Exception e) {
LOGGER.info("Caught exception while compiling SQL request {}: {}, {}",
requestId, query, e.getMessage());
_brokerMetrics.addMeteredGlobalValue(BrokerMeter.REQUEST_COMPILATION_EXCEPTIONS,
1);
- requestStatistics.setErrorCode(QueryException.SQL_PARSING_ERROR_CODE);
+ requestContext.setErrorCode(QueryException.SQL_PARSING_ERROR_CODE);
return new
BrokerResponseNative(QueryException.getException(QueryException.SQL_PARSING_ERROR,
e));
}
@@ -219,7 +219,7 @@ public abstract class BaseBrokerRequestHandler implements
BrokerRequestHandler {
// EXPLAIN PLAN results to show that query is evaluated exclusively
by Broker.
return BrokerResponseNative.BROKER_ONLY_EXPLAIN_PLAN_OUTPUT;
}
- return processLiteralOnlyQuery(pinotQuery, compilationStartTimeNs,
requestStatistics);
+ return processLiteralOnlyQuery(pinotQuery, compilationStartTimeNs,
requestContext);
} catch (Exception e) {
// TODO: refine the exceptions here to early termination the queries
won't requires to send to servers.
LOGGER.warn("Unable to execute literal request {}: {} at broker,
fallback to server query. {}", requestId,
@@ -228,18 +228,18 @@ public abstract class BaseBrokerRequestHandler implements
BrokerRequestHandler {
}
try {
- handleSubquery(pinotQuery, requestId, request, requesterIdentity,
requestStatistics);
+ handleSubquery(pinotQuery, requestId, request, requesterIdentity,
requestContext);
} catch (Exception e) {
LOGGER.info("Caught exception while handling the subquery in request {}:
{}, {}", requestId, query,
e.getMessage());
-
requestStatistics.setErrorCode(QueryException.QUERY_EXECUTION_ERROR_CODE);
+ requestContext.setErrorCode(QueryException.QUERY_EXECUTION_ERROR_CODE);
return new
BrokerResponseNative(QueryException.getException(QueryException.QUERY_EXECUTION_ERROR,
e));
}
String tableName =
getActualTableName(pinotQuery.getDataSource().getTableName());
setTableName(serverBrokerRequest, tableName);
String rawTableName = TableNameBuilder.extractRawTableName(tableName);
- requestStatistics.setTableName(rawTableName);
+ requestContext.setTableName(rawTableName);
try {
boolean isCaseInsensitive = _tableCache.isCaseInsensitive();
@@ -252,7 +252,7 @@ public abstract class BaseBrokerRequestHandler implements
BrokerRequestHandler {
if (e instanceof BadQueryRequestException) {
LOGGER.info("Caught exception while checking column names in request,
{}: {}, {}", requestId, query,
e.getMessage());
-
requestStatistics.setErrorCode(QueryException.UNKNOWN_COLUMN_ERROR_CODE);
+ requestContext.setErrorCode(QueryException.UNKNOWN_COLUMN_ERROR_CODE);
_brokerMetrics.addMeteredTableValue(rawTableName,
BrokerMeter.UNKNOWN_COLUMN_EXCEPTIONS, 1);
return new
BrokerResponseNative(QueryException.getException(QueryException.UNKNOWN_COLUMN_ERROR,
e));
}
@@ -279,7 +279,7 @@ public abstract class BaseBrokerRequestHandler implements
BrokerRequestHandler {
if (!hasTableAccess) {
_brokerMetrics.addMeteredTableValue(tableName,
BrokerMeter.REQUEST_DROPPED_DUE_TO_ACCESS_ERROR, 1);
LOGGER.info("Access denied for request {}: {}, table: {}", requestId,
query, tableName);
- requestStatistics.setErrorCode(QueryException.ACCESS_DENIED_ERROR_CODE);
+ requestContext.setErrorCode(QueryException.ACCESS_DENIED_ERROR_CODE);
return new BrokerResponseNative(QueryException.ACCESS_DENIED_ERROR);
}
_brokerMetrics.addPhaseTiming(rawTableName, BrokerQueryPhase.AUTHORIZATION,
@@ -320,11 +320,11 @@ public abstract class BaseBrokerRequestHandler implements
BrokerRequestHandler {
// No table matches the request
if (realtimeTableConfig == null && offlineTableConfig == null) {
LOGGER.info("Table not found for request {}: {}", requestId, query);
-
requestStatistics.setErrorCode(QueryException.TABLE_DOES_NOT_EXIST_ERROR_CODE);
+
requestContext.setErrorCode(QueryException.TABLE_DOES_NOT_EXIST_ERROR_CODE);
return BrokerResponseNative.TABLE_DOES_NOT_EXIST;
}
LOGGER.info("No table matches for request {}: {}", requestId, query);
-
requestStatistics.setErrorCode(QueryException.BROKER_RESOURCE_MISSING_ERROR_CODE);
+
requestContext.setErrorCode(QueryException.BROKER_RESOURCE_MISSING_ERROR_CODE);
_brokerMetrics.addMeteredGlobalValue(BrokerMeter.RESOURCE_MISSING_EXCEPTIONS,
1);
return BrokerResponseNative.NO_TABLE_RESULT;
}
@@ -339,7 +339,7 @@ public abstract class BaseBrokerRequestHandler implements
BrokerRequestHandler {
String errorMessage =
String.format("Request %d: %s exceeds query quota for table: %s",
requestId, query, tableName);
LOGGER.info(errorMessage);
-
requestStatistics.setErrorCode(QueryException.TOO_MANY_REQUESTS_ERROR_CODE);
+ requestContext.setErrorCode(QueryException.TOO_MANY_REQUESTS_ERROR_CODE);
_brokerMetrics.addMeteredTableValue(rawTableName,
BrokerMeter.QUERY_QUOTA_EXCEEDED, 1);
return new
BrokerResponseNative(QueryException.getException(QueryException.QUOTA_EXCEEDED_ERROR,
errorMessage));
}
@@ -349,7 +349,7 @@ public abstract class BaseBrokerRequestHandler implements
BrokerRequestHandler {
validateRequest(pinotQuery, _queryResponseLimit);
} catch (Exception e) {
LOGGER.info("Caught exception while validating request {}: {}, {}",
requestId, query, e.getMessage());
-
requestStatistics.setErrorCode(QueryException.QUERY_VALIDATION_ERROR_CODE);
+ requestContext.setErrorCode(QueryException.QUERY_VALIDATION_ERROR_CODE);
_brokerMetrics.addMeteredTableValue(rawTableName,
BrokerMeter.QUERY_VALIDATION_EXCEPTIONS, 1);
return new
BrokerResponseNative(QueryException.getException(QueryException.QUERY_VALIDATION_ERROR,
e));
}
@@ -373,9 +373,9 @@ public abstract class BaseBrokerRequestHandler implements
BrokerRequestHandler {
handleExpressionOverride(realtimePinotQuery,
_tableCache.getExpressionOverrideMap(realtimeTableName));
handleTimestampIndexOverride(realtimePinotQuery, realtimeTableConfig);
_queryOptimizer.optimize(realtimePinotQuery, realtimeTableConfig,
schema);
- requestStatistics.setFanoutType(RequestStatistics.FanoutType.HYBRID);
-
requestStatistics.setOfflineServerTenant(getServerTenant(offlineTableName));
-
requestStatistics.setRealtimeServerTenant(getServerTenant(realtimeTableName));
+ requestContext.setFanoutType(RequestContext.FanoutType.HYBRID);
+ requestContext.setOfflineServerTenant(getServerTenant(offlineTableName));
+
requestContext.setRealtimeServerTenant(getServerTenant(realtimeTableName));
} else if (offlineTableName != null) {
// OFFLINE only
setTableName(serverBrokerRequest, offlineTableName);
@@ -383,8 +383,8 @@ public abstract class BaseBrokerRequestHandler implements
BrokerRequestHandler {
handleTimestampIndexOverride(pinotQuery, offlineTableConfig);
_queryOptimizer.optimize(pinotQuery, offlineTableConfig, schema);
offlineBrokerRequest = serverBrokerRequest;
- requestStatistics.setFanoutType(RequestStatistics.FanoutType.OFFLINE);
-
requestStatistics.setOfflineServerTenant(getServerTenant(offlineTableName));
+ requestContext.setFanoutType(RequestContext.FanoutType.OFFLINE);
+ requestContext.setOfflineServerTenant(getServerTenant(offlineTableName));
} else {
// REALTIME only
setTableName(serverBrokerRequest, realtimeTableName);
@@ -392,8 +392,8 @@ public abstract class BaseBrokerRequestHandler implements
BrokerRequestHandler {
handleTimestampIndexOverride(pinotQuery, realtimeTableConfig);
_queryOptimizer.optimize(pinotQuery, realtimeTableConfig, schema);
realtimeBrokerRequest = serverBrokerRequest;
- requestStatistics.setFanoutType(RequestStatistics.FanoutType.REALTIME);
-
requestStatistics.setRealtimeServerTenant(getServerTenant(realtimeTableName));
+ requestContext.setFanoutType(RequestContext.FanoutType.REALTIME);
+
requestContext.setRealtimeServerTenant(getServerTenant(realtimeTableName));
}
// Check if response can be send without server query evaluation.
@@ -417,7 +417,7 @@ public abstract class BaseBrokerRequestHandler implements
BrokerRequestHandler {
// Send empty response since we don't need to evaluate either offline or
realtime request.
BrokerResponseNative brokerResponse = BrokerResponseNative.empty();
- logBrokerResponse(requestId, query, requestStatistics, brokerRequest, 0,
new ServerStats(), brokerResponse,
+ logBrokerResponse(requestId, query, requestContext, brokerRequest, 0,
new ServerStats(), brokerResponse,
System.nanoTime());
return brokerResponse;
@@ -471,7 +471,7 @@ public abstract class BaseBrokerRequestHandler implements
BrokerRequestHandler {
}
}
int numUnavailableSegments = unavailableSegments.size();
- requestStatistics.setNumUnavailableSegments(numUnavailableSegments);
+ requestContext.setNumUnavailableSegments(numUnavailableSegments);
List<ProcessingException> exceptions = new ArrayList<>();
if (numUnavailableSegments > 0) {
@@ -530,7 +530,7 @@ public abstract class BaseBrokerRequestHandler implements
BrokerRequestHandler {
}
BrokerResponseNative brokerResponse =
processBrokerRequest(requestId, brokerRequest, serverBrokerRequest,
offlineBrokerRequest, offlineRoutingTable,
- realtimeBrokerRequest, realtimeRoutingTable, remainingTimeMs,
serverStats, requestStatistics);
+ realtimeBrokerRequest, realtimeRoutingTable, remainingTimeMs,
serverStats, requestContext);
brokerResponse.setExceptions(exceptions);
long executionEndTimeNs = System.nanoTime();
_brokerMetrics.addPhaseTiming(rawTableName,
BrokerQueryPhase.QUERY_EXECUTION,
@@ -544,10 +544,10 @@ public abstract class BaseBrokerRequestHandler implements
BrokerRequestHandler {
// Set total query processing time
long totalTimeMs = TimeUnit.NANOSECONDS.toMillis(executionEndTimeNs -
compilationStartTimeNs);
brokerResponse.setTimeUsedMs(totalTimeMs);
- requestStatistics.setQueryProcessingTime(totalTimeMs);
- augmentStatistics(requestStatistics, brokerResponse);
+ requestContext.setQueryProcessingTime(totalTimeMs);
+ augmentStatistics(requestContext, brokerResponse);
- logBrokerResponse(requestId, query, requestStatistics, brokerRequest,
numUnavailableSegments, serverStats,
+ logBrokerResponse(requestId, query, requestContext, brokerRequest,
numUnavailableSegments, serverStats,
brokerResponse, totalTimeMs);
return brokerResponse;
}
@@ -627,7 +627,7 @@ public abstract class BaseBrokerRequestHandler implements
BrokerRequestHandler {
return TRUE.equals(brokerRequest.getPinotQuery().getFilterExpression());
}
- private void logBrokerResponse(long requestId, String query,
RequestStatistics requestStatistics,
+ private void logBrokerResponse(long requestId, String query, RequestContext
requestContext,
BrokerRequest brokerRequest, int numUnavailableSegments, ServerStats
serverStats,
BrokerResponseNative brokerResponse, long totalTimeMs) {
LOGGER.debug("Broker Response: {}", brokerResponse);
@@ -649,7 +649,7 @@ public abstract class BaseBrokerRequestHandler implements
BrokerRequestHandler {
brokerResponse.getNumConsumingSegmentsQueried(),
numUnavailableSegments,
brokerResponse.getMinConsumingFreshnessTimeMs(),
brokerResponse.getNumServersResponded(),
brokerResponse.getNumServersQueried(),
brokerResponse.isNumGroupsLimitReached(),
- requestStatistics.getReduceTimeMillis(),
brokerResponse.getExceptionsSize(), serverStats.getServerStats(),
+ requestContext.getReduceTimeMillis(),
brokerResponse.getExceptionsSize(), serverStats.getServerStats(),
brokerResponse.getOfflineTotalCpuTimeNs(),
brokerResponse.getOfflineThreadCpuTimeNs(),
brokerResponse.getOfflineSystemActivitiesCpuTimeNs(),
brokerResponse.getOfflineResponseSerializationCpuTimeNs(),
brokerResponse.getRealtimeTotalCpuTimeNs(),
@@ -687,11 +687,11 @@ public abstract class BaseBrokerRequestHandler implements
BrokerRequestHandler {
* <p>Currently only supports subquery within the filter.
*/
private void handleSubquery(PinotQuery pinotQuery, long requestId, JsonNode
jsonRequest,
- @Nullable RequesterIdentity requesterIdentity, RequestStatistics
requestStatistics)
+ @Nullable RequesterIdentity requesterIdentity, RequestContext
requestContext)
throws Exception {
Expression filterExpression = pinotQuery.getFilterExpression();
if (filterExpression != null) {
- handleSubquery(filterExpression, requestId, jsonRequest,
requesterIdentity, requestStatistics);
+ handleSubquery(filterExpression, requestId, jsonRequest,
requesterIdentity, requestContext);
}
}
@@ -703,7 +703,7 @@ public abstract class BaseBrokerRequestHandler implements
BrokerRequestHandler {
* IN_ID_SET transform function.
*/
private void handleSubquery(Expression expression, long requestId, JsonNode
jsonRequest,
- @Nullable RequesterIdentity requesterIdentity, RequestStatistics
requestStatistics)
+ @Nullable RequesterIdentity requesterIdentity, RequestContext
requestContext)
throws Exception {
Function function = expression.getFunctionCall();
if (function == null) {
@@ -716,7 +716,7 @@ public abstract class BaseBrokerRequestHandler implements
BrokerRequestHandler {
Preconditions.checkState(subqueryLiteral != null, "Second argument of
IN_SUBQUERY must be a literal (subquery)");
String subquery = subqueryLiteral.getStringValue();
BrokerResponseNative response =
- handleRequest(requestId, subquery, jsonRequest, requesterIdentity,
requestStatistics);
+ handleRequest(requestId, subquery, jsonRequest, requesterIdentity,
requestContext);
if (response.getExceptionsSize() != 0) {
throw new RuntimeException("Caught exception while executing subquery:
" + subquery);
}
@@ -725,7 +725,7 @@ public abstract class BaseBrokerRequestHandler implements
BrokerRequestHandler {
operands.set(1, RequestUtils.getLiteralExpression(serializedIdSet));
} else {
for (Expression operand : operands) {
- handleSubquery(operand, requestId, jsonRequest, requesterIdentity,
requestStatistics);
+ handleSubquery(operand, requestId, jsonRequest, requesterIdentity,
requestContext);
}
}
}
@@ -1097,7 +1097,7 @@ public abstract class BaseBrokerRequestHandler implements
BrokerRequestHandler {
* Processes the literal only query.
*/
private BrokerResponseNative processLiteralOnlyQuery(PinotQuery pinotQuery,
long compilationStartTimeNs,
- RequestStatistics requestStatistics) {
+ RequestContext requestContext) {
BrokerResponseNative brokerResponse = new BrokerResponseNative();
List<String> columnNames = new ArrayList<>();
List<DataSchema.ColumnDataType> columnTypes = new ArrayList<>();
@@ -1114,8 +1114,8 @@ public abstract class BaseBrokerRequestHandler implements
BrokerRequestHandler {
long totalTimeMs = TimeUnit.NANOSECONDS.toMillis(System.nanoTime() -
compilationStartTimeNs);
brokerResponse.setTimeUsedMs(totalTimeMs);
- requestStatistics.setQueryProcessingTime(totalTimeMs);
- augmentStatistics(requestStatistics, brokerResponse);
+ requestContext.setQueryProcessingTime(totalTimeMs);
+ augmentStatistics(requestContext, brokerResponse);
return brokerResponse;
}
@@ -1516,13 +1516,12 @@ public abstract class BaseBrokerRequestHandler
implements BrokerRequestHandler {
* Processes the optimized broker requests for both OFFLINE and REALTIME
table.
*/
protected abstract BrokerResponseNative processBrokerRequest(long requestId,
BrokerRequest originalBrokerRequest,
- BrokerRequest serverBrokerRequest, @Nullable BrokerRequest
offlineBrokerRequest,
- @Nullable Map<ServerInstance, List<String>> offlineRoutingTable,
@Nullable BrokerRequest realtimeBrokerRequest,
- @Nullable Map<ServerInstance, List<String>> realtimeRoutingTable, long
timeoutMs, ServerStats serverStats,
- RequestStatistics requestStatistics)
+ BrokerRequest serverBrokerRequest, @Nullable BrokerRequest
offlineBrokerRequest, @Nullable Map<ServerInstance,
+ List<String>> offlineRoutingTable, @Nullable BrokerRequest
realtimeBrokerRequest, @Nullable Map<ServerInstance,
+ List<String>> realtimeRoutingTable, long timeoutMs, ServerStats
serverStats, RequestContext requestContext)
throws Exception;
- private static void augmentStatistics(RequestStatistics statistics,
BrokerResponse response) {
+ private static void augmentStatistics(RequestContext statistics,
BrokerResponse response) {
statistics.setTotalDocs(response.getTotalDocs());
statistics.setNumDocsScanned(response.getNumDocsScanned());
statistics.setNumEntriesScannedInFilter(response.getNumEntriesScannedInFilter());
diff --git
a/pinot-broker/src/main/java/org/apache/pinot/broker/requesthandler/BrokerRequestHandler.java
b/pinot-broker/src/main/java/org/apache/pinot/broker/requesthandler/BrokerRequestHandler.java
index 786aa8b324..93591dee71 100644
---
a/pinot-broker/src/main/java/org/apache/pinot/broker/requesthandler/BrokerRequestHandler.java
+++
b/pinot-broker/src/main/java/org/apache/pinot/broker/requesthandler/BrokerRequestHandler.java
@@ -23,7 +23,7 @@ import javax.annotation.Nullable;
import javax.annotation.concurrent.ThreadSafe;
import org.apache.pinot.broker.api.RequesterIdentity;
import org.apache.pinot.common.response.BrokerResponse;
-import org.apache.pinot.spi.trace.RequestStatistics;
+import org.apache.pinot.spi.trace.RequestContext;
@ThreadSafe
@@ -34,6 +34,6 @@ public interface BrokerRequestHandler {
void shutDown();
BrokerResponse handleRequest(JsonNode request, @Nullable RequesterIdentity
requesterIdentity,
- RequestStatistics requestStatistics)
+ RequestContext requestContext)
throws Exception;
}
diff --git
a/pinot-broker/src/main/java/org/apache/pinot/broker/requesthandler/GrpcBrokerRequestHandler.java
b/pinot-broker/src/main/java/org/apache/pinot/broker/requesthandler/GrpcBrokerRequestHandler.java
index e2c7cf40b3..468add98c2 100644
---
a/pinot-broker/src/main/java/org/apache/pinot/broker/requesthandler/GrpcBrokerRequestHandler.java
+++
b/pinot-broker/src/main/java/org/apache/pinot/broker/requesthandler/GrpcBrokerRequestHandler.java
@@ -41,7 +41,7 @@ import org.apache.pinot.core.transport.ServerInstance;
import org.apache.pinot.core.transport.ServerRoutingInstance;
import org.apache.pinot.spi.config.table.TableType;
import org.apache.pinot.spi.env.PinotConfiguration;
-import org.apache.pinot.spi.trace.RequestStatistics;
+import org.apache.pinot.spi.trace.RequestContext;
/**
@@ -81,18 +81,20 @@ public class GrpcBrokerRequestHandler extends
BaseBrokerRequestHandler {
protected BrokerResponseNative processBrokerRequest(long requestId,
BrokerRequest originalBrokerRequest,
BrokerRequest serverBrokerRequest, @Nullable BrokerRequest
offlineBrokerRequest, @Nullable Map<ServerInstance,
List<String>> offlineRoutingTable, @Nullable BrokerRequest
realtimeBrokerRequest, @Nullable Map<ServerInstance,
- List<String>> realtimeRoutingTable, long timeoutMs, ServerStats
serverStats, RequestStatistics requestStatistics)
+ List<String>> realtimeRoutingTable, long timeoutMs, ServerStats
serverStats, RequestContext requestContext)
throws Exception {
// TODO: Support failure detection
assert offlineBrokerRequest != null || realtimeBrokerRequest != null;
Map<ServerRoutingInstance, Iterator<Server.ServerResponse>> responseMap =
new HashMap<>();
if (offlineBrokerRequest != null) {
assert offlineRoutingTable != null;
- sendRequest(TableType.OFFLINE, offlineBrokerRequest,
offlineRoutingTable, responseMap);
+ sendRequest(TableType.OFFLINE, offlineBrokerRequest,
offlineRoutingTable, responseMap,
+ requestContext.isSampledRequest());
}
if (realtimeBrokerRequest != null) {
assert realtimeRoutingTable != null;
- sendRequest(TableType.REALTIME, realtimeBrokerRequest,
realtimeRoutingTable, responseMap);
+ sendRequest(TableType.REALTIME, realtimeBrokerRequest,
realtimeRoutingTable, responseMap,
+ requestContext.isSampledRequest());
}
return
_streamingReduceService.reduceOnStreamResponse(originalBrokerRequest,
responseMap, timeoutMs,
_brokerMetrics);
@@ -103,7 +105,7 @@ public class GrpcBrokerRequestHandler extends
BaseBrokerRequestHandler {
*/
private void sendRequest(TableType tableType, BrokerRequest brokerRequest,
Map<ServerInstance, List<String>> routingTable,
- Map<ServerRoutingInstance, Iterator<Server.ServerResponse>> responseMap)
{
+ Map<ServerRoutingInstance, Iterator<Server.ServerResponse>> responseMap,
boolean trace) {
for (Map.Entry<ServerInstance, List<String>> routingEntry :
routingTable.entrySet()) {
ServerInstance serverInstance = routingEntry.getKey();
List<String> segments = routingEntry.getValue();
@@ -111,7 +113,8 @@ public class GrpcBrokerRequestHandler extends
BaseBrokerRequestHandler {
int port = serverInstance.getGrpcPort();
// TODO: enable throttling on per host bases.
Iterator<Server.ServerResponse> streamingResponse =
_streamingQueryClient.submit(serverHost, port,
- new
GrpcRequestBuilder().setSegments(segments).setBrokerRequest(brokerRequest).setEnableStreaming(true));
+ new
GrpcRequestBuilder().setSegments(segments).setBrokerRequest(brokerRequest).setEnableStreaming(true)
+ .setEnableTrace(trace));
responseMap.put(serverInstance.toServerRoutingInstance(tableType,
ServerInstance.RoutingType.GRPC),
streamingResponse);
}
diff --git
a/pinot-broker/src/main/java/org/apache/pinot/broker/requesthandler/SingleConnectionBrokerRequestHandler.java
b/pinot-broker/src/main/java/org/apache/pinot/broker/requesthandler/SingleConnectionBrokerRequestHandler.java
index f16c12f893..2e399211cb 100644
---
a/pinot-broker/src/main/java/org/apache/pinot/broker/requesthandler/SingleConnectionBrokerRequestHandler.java
+++
b/pinot-broker/src/main/java/org/apache/pinot/broker/requesthandler/SingleConnectionBrokerRequestHandler.java
@@ -49,7 +49,8 @@ import org.apache.pinot.core.transport.ServerInstance;
import org.apache.pinot.core.transport.ServerResponse;
import org.apache.pinot.core.transport.ServerRoutingInstance;
import org.apache.pinot.spi.env.PinotConfiguration;
-import org.apache.pinot.spi.trace.RequestStatistics;
+import org.apache.pinot.spi.trace.RequestContext;
+import org.apache.pinot.spi.utils.CommonConstants;
import org.apache.pinot.spi.utils.builder.TableNameBuilder;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -95,9 +96,12 @@ public class SingleConnectionBrokerRequestHandler extends
BaseBrokerRequestHandl
BrokerRequest serverBrokerRequest, @Nullable BrokerRequest
offlineBrokerRequest,
@Nullable Map<ServerInstance, List<String>> offlineRoutingTable,
@Nullable BrokerRequest realtimeBrokerRequest,
@Nullable Map<ServerInstance, List<String>> realtimeRoutingTable, long
timeoutMs, ServerStats serverStats,
- RequestStatistics requestStatistics)
+ RequestContext requestContext)
throws Exception {
assert offlineBrokerRequest != null || realtimeBrokerRequest != null;
+ if (requestContext.isSampledRequest()) {
+
serverBrokerRequest.getPinotQuery().putToQueryOptions(CommonConstants.Broker.Request.TRACE,
"true");
+ }
String rawTableName =
TableNameBuilder.extractRawTableName(originalBrokerRequest.getQuerySource().getTableName());
long scatterGatherStartTimeNs = System.nanoTime();
@@ -134,7 +138,7 @@ public class SingleConnectionBrokerRequestHandler extends
BaseBrokerRequestHandl
_brokerReduceService.reduceOnDataTable(originalBrokerRequest,
serverBrokerRequest, dataTableMap,
reduceTimeOutMs, _brokerMetrics);
final long reduceTimeNanos = System.nanoTime() - reduceStartTimeNs;
- requestStatistics.setReduceTimeNanos(reduceTimeNanos);
+ requestContext.setReduceTimeNanos(reduceTimeNanos);
_brokerMetrics.addPhaseTiming(rawTableName, BrokerQueryPhase.REDUCE,
reduceTimeNanos);
brokerResponse.setNumServersQueried(numServersQueried);
diff --git
a/pinot-broker/src/test/java/org/apache/pinot/broker/requesthandler/LiteralOnlyBrokerRequestTest.java
b/pinot-broker/src/test/java/org/apache/pinot/broker/requesthandler/LiteralOnlyBrokerRequestTest.java
index bee32f9432..704d397c35 100644
---
a/pinot-broker/src/test/java/org/apache/pinot/broker/requesthandler/LiteralOnlyBrokerRequestTest.java
+++
b/pinot-broker/src/test/java/org/apache/pinot/broker/requesthandler/LiteralOnlyBrokerRequestTest.java
@@ -30,7 +30,7 @@ import org.apache.pinot.common.metrics.PinotMetricUtils;
import org.apache.pinot.common.response.broker.BrokerResponseNative;
import org.apache.pinot.common.utils.DataSchema;
import org.apache.pinot.spi.env.PinotConfiguration;
-import org.apache.pinot.spi.trace.RequestStatistics;
+import org.apache.pinot.spi.trace.RequestContext;
import org.apache.pinot.spi.trace.Tracing;
import org.apache.pinot.spi.utils.BytesUtils;
import org.apache.pinot.sql.parsers.CalciteSqlParser;
@@ -129,7 +129,7 @@ public class LiteralOnlyBrokerRequestTest {
RANDOM.nextBytes(randBytes);
String ranStr = BytesUtils.toHexString(randBytes);
JsonNode request = new
ObjectMapper().readTree(String.format("{\"sql\":\"SELECT %d, '%s'\"}", randNum,
ranStr));
- RequestStatistics requestStats = Tracing.getTracer().createRequestScope();
+ RequestContext requestStats = Tracing.getTracer().createRequestScope();
BrokerResponseNative brokerResponse =
requestHandler.handleRequest(request, null, requestStats);
Assert.assertEquals(brokerResponse.getResultTable().getDataSchema().getColumnName(0),
String.format("%d", randNum));
Assert.assertEquals(brokerResponse.getResultTable().getDataSchema().getColumnDataType(0),
@@ -154,7 +154,7 @@ public class LiteralOnlyBrokerRequestTest {
long currentTsMin = System.currentTimeMillis();
JsonNode request = new ObjectMapper().readTree(
"{\"sql\":\"SELECT now() as currentTs, fromDateTime('2020-01-01 UTC',
'yyyy-MM-dd z') as firstDayOf2020\"}");
- RequestStatistics requestStats = Tracing.getTracer().createRequestScope();
+ RequestContext requestStats = Tracing.getTracer().createRequestScope();
BrokerResponseNative brokerResponse =
requestHandler.handleRequest(request, null, requestStats);
long currentTsMax = System.currentTimeMillis();
Assert.assertEquals(brokerResponse.getResultTable().getDataSchema().getColumnName(0),
"currentTs");
@@ -225,7 +225,7 @@ public class LiteralOnlyBrokerRequestTest {
ObjectMapper objectMapper = new ObjectMapper();
// Test 1: select constant
JsonNode request = objectMapper.readTree("{\"sql\":\"EXPLAIN PLAN FOR
SELECT 1.5, 'test'\"}");
- RequestStatistics requestStats = Tracing.getTracer().createRequestScope();
+ RequestContext requestStats = Tracing.getTracer().createRequestScope();
BrokerResponseNative brokerResponse =
requestHandler.handleRequest(request, null, requestStats);
checkExplainResultSchema(brokerResponse.getResultTable().getDataSchema(),
diff --git
a/pinot-spi/src/main/java/org/apache/pinot/spi/trace/DefaultRequestStatistics.java
b/pinot-spi/src/main/java/org/apache/pinot/spi/trace/DefaultRequestContext.java
similarity index 98%
rename from
pinot-spi/src/main/java/org/apache/pinot/spi/trace/DefaultRequestStatistics.java
rename to
pinot-spi/src/main/java/org/apache/pinot/spi/trace/DefaultRequestContext.java
index 649688b0f2..60e1a1cca8 100644
---
a/pinot-spi/src/main/java/org/apache/pinot/spi/trace/DefaultRequestStatistics.java
+++
b/pinot-spi/src/main/java/org/apache/pinot/spi/trace/DefaultRequestContext.java
@@ -26,7 +26,7 @@ import java.util.concurrent.TimeUnit;
* This object can be used to publish the query processing statistics to a
stream for
* post-processing at a finer level than metrics.
*/
-public class DefaultRequestStatistics implements RequestScope {
+public class DefaultRequestContext implements RequestScope {
private static final String DEFAULT_TABLE_NAME = "NotYetParsed";
@@ -64,7 +64,7 @@ public class DefaultRequestStatistics implements RequestScope
{
private FanoutType _fanoutType;
private int _numUnavailableSegments;
- public DefaultRequestStatistics() {
+ public DefaultRequestContext() {
}
@Override
diff --git
a/pinot-spi/src/main/java/org/apache/pinot/spi/trace/RequestStatistics.java
b/pinot-spi/src/main/java/org/apache/pinot/spi/trace/RequestContext.java
similarity index 97%
rename from
pinot-spi/src/main/java/org/apache/pinot/spi/trace/RequestStatistics.java
rename to pinot-spi/src/main/java/org/apache/pinot/spi/trace/RequestContext.java
index 9f47d348b0..89f768608b 100644
--- a/pinot-spi/src/main/java/org/apache/pinot/spi/trace/RequestStatistics.java
+++ b/pinot-spi/src/main/java/org/apache/pinot/spi/trace/RequestContext.java
@@ -18,7 +18,7 @@
*/
package org.apache.pinot.spi.trace;
-public interface RequestStatistics {
+public interface RequestContext {
long getOfflineSystemActivitiesCpuTimeNs();
void setOfflineSystemActivitiesCpuTimeNs(long
offlineSystemActivitiesCpuTimeNs);
@@ -51,6 +51,10 @@ public interface RequestStatistics {
long getRequestId();
+ default boolean isSampledRequest() {
+ return false;
+ }
+
long getRequestArrivalTimeMillis();
long getReduceTimeMillis();
diff --git
a/pinot-spi/src/main/java/org/apache/pinot/spi/trace/RequestScope.java
b/pinot-spi/src/main/java/org/apache/pinot/spi/trace/RequestScope.java
index 450ad20e23..3d1c7b052f 100644
--- a/pinot-spi/src/main/java/org/apache/pinot/spi/trace/RequestScope.java
+++ b/pinot-spi/src/main/java/org/apache/pinot/spi/trace/RequestScope.java
@@ -22,5 +22,5 @@ package org.apache.pinot.spi.trace;
* A scope wrapping an end to end synchronous pinot request.
* Can be extended by a custom tracer to meter request latency.
*/
-public interface RequestScope extends Scope, RequestStatistics {
+public interface RequestScope extends Scope, RequestContext {
}
diff --git a/pinot-spi/src/main/java/org/apache/pinot/spi/trace/Tracer.java
b/pinot-spi/src/main/java/org/apache/pinot/spi/trace/Tracer.java
index bc448d0461..bf07b6f828 100644
--- a/pinot-spi/src/main/java/org/apache/pinot/spi/trace/Tracer.java
+++ b/pinot-spi/src/main/java/org/apache/pinot/spi/trace/Tracer.java
@@ -48,7 +48,7 @@ public interface Tracer {
* @return the request record
*/
default RequestScope createRequestScope() {
- return new DefaultRequestStatistics();
+ return new DefaultRequestContext();
}
/**
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]