This is an automated email from the ASF dual-hosted git repository.
xiangfu pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pinot.git
The following commit(s) were added to refs/heads/master by this push:
new 36be75f1dd Add more information in RequestContext class (#11708)
36be75f1dd is described below
commit 36be75f1dd9525e844a4a2bbd545ea84b54bd609
Author: Pratik Tibrewal <[email protected]>
AuthorDate: Sun Oct 15 23:25:08 2023 +0530
Add more information in RequestContext class (#11708)
---
.../requesthandler/BaseBrokerRequestHandler.java | 13 ++
.../MultiStageBrokerRequestHandler.java | 2 +
.../SingleConnectionBrokerRequestHandler.java | 1 +
.../pinot/spi/trace/DefaultRequestContext.java | 168 ++++++++++++++++++++-
.../org/apache/pinot/spi/trace/RequestContext.java | 60 ++++++++
5 files changed, 240 insertions(+), 4 deletions(-)
diff --git
a/pinot-broker/src/main/java/org/apache/pinot/broker/requesthandler/BaseBrokerRequestHandler.java
b/pinot-broker/src/main/java/org/apache/pinot/broker/requesthandler/BaseBrokerRequestHandler.java
index 47ffeddedf..49ae0d9d94 100644
---
a/pinot-broker/src/main/java/org/apache/pinot/broker/requesthandler/BaseBrokerRequestHandler.java
+++
b/pinot-broker/src/main/java/org/apache/pinot/broker/requesthandler/BaseBrokerRequestHandler.java
@@ -1747,6 +1747,19 @@ public abstract class BaseBrokerRequestHandler
implements BrokerRequestHandler {
statistics.setOfflineTotalCpuTimeNs(response.getOfflineTotalCpuTimeNs());
statistics.setRealtimeTotalCpuTimeNs(response.getRealtimeTotalCpuTimeNs());
statistics.setNumRowsResultSet(response.getNumRowsResultSet());
+
statistics.setNumConsumingSegmentsQueried(response.getNumConsumingSegmentsQueried());
+
statistics.setNumConsumingSegmentsProcessed(response.getNumConsumingSegmentsProcessed());
+
statistics.setNumConsumingSegmentsMatched(response.getNumConsumingSegmentsMatched());
+
statistics.setMinConsumingFreshnessTimeMs(response.getMinConsumingFreshnessTimeMs());
+
statistics.setNumSegmentsPrunedByBroker(response.getNumSegmentsPrunedByBroker());
+
statistics.setNumSegmentsPrunedByServer(response.getNumSegmentsPrunedByServer());
+
statistics.setNumSegmentsPrunedInvalid(response.getNumSegmentsPrunedInvalid());
+
statistics.setNumSegmentsPrunedByLimit(response.getNumSegmentsPrunedByLimit());
+
statistics.setNumSegmentsPrunedByValue(response.getNumSegmentsPrunedByValue());
+
statistics.setExplainPlanNumEmptyFilterSegments(response.getExplainPlanNumEmptyFilterSegments());
+
statistics.setExplainPlanNumMatchAllFilterSegments(response.getExplainPlanNumMatchAllFilterSegments());
+
statistics.setProcessingExceptions(response.getProcessingExceptions().stream().map(Object::toString).collect(
+ Collectors.toList()));
}
private String getGlobalQueryId(long requestId) {
diff --git
a/pinot-broker/src/main/java/org/apache/pinot/broker/requesthandler/MultiStageBrokerRequestHandler.java
b/pinot-broker/src/main/java/org/apache/pinot/broker/requesthandler/MultiStageBrokerRequestHandler.java
index 7fb9b24c14..5c4e86a7d4 100644
---
a/pinot-broker/src/main/java/org/apache/pinot/broker/requesthandler/MultiStageBrokerRequestHandler.java
+++
b/pinot-broker/src/main/java/org/apache/pinot/broker/requesthandler/MultiStageBrokerRequestHandler.java
@@ -156,6 +156,7 @@ public class MultiStageBrokerRequestHandler extends
BaseBrokerRequestHandler {
DispatchableSubPlan dispatchableSubPlan = queryPlanResult.getQueryPlan();
Set<String> tableNames = queryPlanResult.getTableNames();
+ requestContext.setTableNames(List.copyOf(tableNames));
// Compilation Time. This includes the time taken for parsing, compiling,
create stage plans and assigning workers.
long compilationEndTimeNs = System.nanoTime();
@@ -235,6 +236,7 @@ public class MultiStageBrokerRequestHandler extends
BaseBrokerRequestHandler {
sqlNodeAndOptions.getParseTimeNs() + (executionEndTimeNs -
compilationStartTimeNs));
brokerResponse.setTimeUsedMs(totalTimeMs);
requestContext.setQueryProcessingTime(totalTimeMs);
+ requestContext.setTraceInfo(brokerResponse.getTraceInfo());
augmentStatistics(requestContext, brokerResponse);
// Log query and stats
diff --git
a/pinot-broker/src/main/java/org/apache/pinot/broker/requesthandler/SingleConnectionBrokerRequestHandler.java
b/pinot-broker/src/main/java/org/apache/pinot/broker/requesthandler/SingleConnectionBrokerRequestHandler.java
index 3ad8648442..0e0c35d318 100644
---
a/pinot-broker/src/main/java/org/apache/pinot/broker/requesthandler/SingleConnectionBrokerRequestHandler.java
+++
b/pinot-broker/src/main/java/org/apache/pinot/broker/requesthandler/SingleConnectionBrokerRequestHandler.java
@@ -144,6 +144,7 @@ public class SingleConnectionBrokerRequestHandler extends
BaseBrokerRequestHandl
_brokerReduceService.reduceOnDataTable(originalBrokerRequest,
serverBrokerRequest, dataTableMap,
reduceTimeOutMs, _brokerMetrics);
final long reduceTimeNanos = System.nanoTime() - reduceStartTimeNs;
+ requestContext.setTraceInfo(brokerResponse.getTraceInfo());
requestContext.setReduceTimeNanos(reduceTimeNanos);
_brokerMetrics.addPhaseTiming(rawTableName, BrokerQueryPhase.REDUCE,
reduceTimeNanos);
diff --git
a/pinot-spi/src/main/java/org/apache/pinot/spi/trace/DefaultRequestContext.java
b/pinot-spi/src/main/java/org/apache/pinot/spi/trace/DefaultRequestContext.java
index 60e1a1cca8..6ce063d253 100644
---
a/pinot-spi/src/main/java/org/apache/pinot/spi/trace/DefaultRequestContext.java
+++
b/pinot-spi/src/main/java/org/apache/pinot/spi/trace/DefaultRequestContext.java
@@ -18,6 +18,10 @@
*/
package org.apache.pinot.spi.trace;
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
import java.util.concurrent.TimeUnit;
@@ -32,7 +36,7 @@ public class DefaultRequestContext implements RequestScope {
private int _errorCode = 0;
private String _query;
- private String _tableName = DEFAULT_TABLE_NAME;
+ private List<String> _tableNames = new ArrayList<>();
private long _processingTimeMillis = -1;
private long _totalDocs;
private long _numDocsScanned;
@@ -63,6 +67,19 @@ public class DefaultRequestContext implements RequestScope {
private FanoutType _fanoutType;
private int _numUnavailableSegments;
+ private long _numConsumingSegmentsQueried;
+ private long _numConsumingSegmentsProcessed;
+ private long _numConsumingSegmentsMatched;
+ private long _minConsumingFreshnessTimeMs;
+ private long _numSegmentsPrunedByBroker;
+ private long _numSegmentsPrunedByServer;
+ private long _numSegmentsPrunedInvalid;
+ private long _numSegmentsPrunedByLimit;
+ private long _numSegmentsPrunedByValue;
+ private long _explainPlanNumEmptyFilterSegments;
+ private long _explainPlanNumMatchAllFilterSegments;
+ private Map<String, String> _traceInfo = new HashMap<>();
+ private List<String> _processingExceptions = new ArrayList<>();
public DefaultRequestContext() {
}
@@ -169,7 +186,12 @@ public class DefaultRequestContext implements RequestScope
{
@Override
public void setTableName(String tableName) {
- _tableName = tableName;
+ _tableNames.add(tableName);
+ }
+
+ @Override
+ public void setTableNames(List<String> tableNames) {
+ _tableNames.addAll(tableNames);
}
@Override
@@ -239,7 +261,15 @@ public class DefaultRequestContext implements RequestScope
{
@Override
public String getTableName() {
- return _tableName;
+ if (_tableNames.size() == 0) {
+ return DEFAULT_TABLE_NAME;
+ }
+ return _tableNames.get(0);
+ }
+
+ @Override
+ public List<String> getTableNames() {
+ return _tableNames;
}
@Override
@@ -314,7 +344,7 @@ public class DefaultRequestContext implements RequestScope {
@Override
public boolean hasValidTableName() {
- return !DEFAULT_TABLE_NAME.equals(_tableName);
+ return !_tableNames.isEmpty();
}
@Override
@@ -402,6 +432,136 @@ public class DefaultRequestContext implements
RequestScope {
_reduceTimeMillis = reduceTimeMillis;
}
+ @Override
+ public long getNumConsumingSegmentsQueried() {
+ return _numConsumingSegmentsQueried;
+ }
+
+ @Override
+ public void setNumConsumingSegmentsQueried(long numConsumingSegmentsQueried)
{
+ _numConsumingSegmentsQueried = numConsumingSegmentsQueried;
+ }
+
+ @Override
+ public long getNumConsumingSegmentsProcessed() {
+ return _numConsumingSegmentsProcessed;
+ }
+
+ @Override
+ public void setNumConsumingSegmentsProcessed(long
numConsumingSegmentsProcessed) {
+ _numConsumingSegmentsProcessed = numConsumingSegmentsProcessed;
+ }
+
+ @Override
+ public long getNumConsumingSegmentsMatched() {
+ return _numConsumingSegmentsMatched;
+ }
+
+ @Override
+ public void setNumConsumingSegmentsMatched(long numConsumingSegmentsMatched)
{
+ _numConsumingSegmentsMatched = numConsumingSegmentsMatched;
+ }
+
+ @Override
+ public long getMinConsumingFreshnessTimeMs() {
+ return _minConsumingFreshnessTimeMs;
+ }
+
+ @Override
+ public void setMinConsumingFreshnessTimeMs(long minConsumingFreshnessTimeMs)
{
+ _minConsumingFreshnessTimeMs = minConsumingFreshnessTimeMs;
+ }
+
+ @Override
+ public long getNumSegmentsPrunedByBroker() {
+ return _numSegmentsPrunedByBroker;
+ }
+
+ @Override
+ public void setNumSegmentsPrunedByBroker(long numSegmentsPrunedByBroker) {
+ _numSegmentsPrunedByBroker = numSegmentsPrunedByBroker;
+ }
+
+ @Override
+ public long getNumSegmentsPrunedByServer() {
+ return _numSegmentsPrunedByServer;
+ }
+
+ @Override
+ public void setNumSegmentsPrunedByServer(long numSegmentsPrunedByServer) {
+ _numSegmentsPrunedByServer = numSegmentsPrunedByServer;
+ }
+
+ @Override
+ public long getNumSegmentsPrunedInvalid() {
+ return _numSegmentsPrunedInvalid;
+ }
+
+ @Override
+ public void setNumSegmentsPrunedInvalid(long numSegmentsPrunedInvalid) {
+ _numSegmentsPrunedInvalid = numSegmentsPrunedInvalid;
+ }
+
+ @Override
+ public long getNumSegmentsPrunedByLimit() {
+ return _numSegmentsPrunedByLimit;
+ }
+
+ @Override
+ public void setNumSegmentsPrunedByLimit(long numSegmentsPrunedByLimit) {
+ _numSegmentsPrunedByLimit = numSegmentsPrunedByLimit;
+ }
+
+ @Override
+ public long getNumSegmentsPrunedByValue() {
+ return _numSegmentsPrunedByValue;
+ }
+
+ @Override
+ public void setNumSegmentsPrunedByValue(long numSegmentsPrunedByValue) {
+ _numSegmentsPrunedByValue = numSegmentsPrunedByValue;
+ }
+
+ @Override
+ public long getExplainPlanNumEmptyFilterSegments() {
+ return _explainPlanNumEmptyFilterSegments;
+ }
+
+ @Override
+ public void setExplainPlanNumEmptyFilterSegments(long
explainPlanNumEmptyFilterSegments) {
+ _explainPlanNumEmptyFilterSegments = explainPlanNumEmptyFilterSegments;
+ }
+
+ @Override
+ public long getExplainPlanNumMatchAllFilterSegments() {
+ return _explainPlanNumMatchAllFilterSegments;
+ }
+
+ @Override
+ public void setExplainPlanNumMatchAllFilterSegments(long
explainPlanNumMatchAllFilterSegments) {
+ _explainPlanNumMatchAllFilterSegments =
explainPlanNumMatchAllFilterSegments;
+ }
+
+ @Override
+ public Map<String, String> getTraceInfo() {
+ return _traceInfo;
+ }
+
+ @Override
+ public void setTraceInfo(Map<String, String> traceInfo) {
+ _traceInfo.putAll(traceInfo);
+ }
+
+ @Override
+ public List<String> getProcessingExceptions() {
+ return _processingExceptions;
+ }
+
+ @Override
+ public void setProcessingExceptions(List<String> processingExceptions) {
+ _processingExceptions.addAll(processingExceptions);
+ }
+
@Override
public void close() {
}
diff --git
a/pinot-spi/src/main/java/org/apache/pinot/spi/trace/RequestContext.java
b/pinot-spi/src/main/java/org/apache/pinot/spi/trace/RequestContext.java
index 89f768608b..f8ec35a921 100644
--- a/pinot-spi/src/main/java/org/apache/pinot/spi/trace/RequestContext.java
+++ b/pinot-spi/src/main/java/org/apache/pinot/spi/trace/RequestContext.java
@@ -18,6 +18,10 @@
*/
package org.apache.pinot.spi.trace;
+import java.util.List;
+import java.util.Map;
+
+
public interface RequestContext {
long getOfflineSystemActivitiesCpuTimeNs();
@@ -65,6 +69,8 @@ public interface RequestContext {
void setTableName(String tableName);
+ void setTableNames(List<String> tableNames);
+
void setQueryProcessingTime(long processingTimeMillis);
void setBrokerId(String brokerId);
@@ -93,6 +99,8 @@ public interface RequestContext {
String getTableName();
+ List<String> getTableNames();
+
long getProcessingTimeMillis();
long getTotalDocs();
@@ -157,6 +165,58 @@ public interface RequestContext {
void setReduceTimeMillis(long reduceTimeMillis);
+ long getNumConsumingSegmentsQueried();
+
+ void setNumConsumingSegmentsQueried(long numConsumingSegmentsQueried);
+
+ long getNumConsumingSegmentsProcessed();
+
+ void setNumConsumingSegmentsProcessed(long numConsumingSegmentsProcessed);
+
+ long getNumConsumingSegmentsMatched();
+
+ void setNumConsumingSegmentsMatched(long numConsumingSegmentsMatched);
+
+ long getMinConsumingFreshnessTimeMs();
+
+ void setMinConsumingFreshnessTimeMs(long minConsumingFreshnessTimeMs);
+
+ long getNumSegmentsPrunedByBroker();
+
+ void setNumSegmentsPrunedByBroker(long numSegmentsPrunedByBroker);
+
+ long getNumSegmentsPrunedByServer();
+
+ void setNumSegmentsPrunedByServer(long numSegmentsPrunedByServer);
+
+ long getNumSegmentsPrunedInvalid();
+
+ void setNumSegmentsPrunedInvalid(long numSegmentsPrunedInvalid);
+
+ long getNumSegmentsPrunedByLimit();
+
+ void setNumSegmentsPrunedByLimit(long numSegmentsPrunedByLimit);
+
+ long getNumSegmentsPrunedByValue();
+
+ void setNumSegmentsPrunedByValue(long numSegmentsPrunedByValue);
+
+ long getExplainPlanNumEmptyFilterSegments();
+
+ void setExplainPlanNumEmptyFilterSegments(long
explainPlanNumEmptyFilterSegments);
+
+ long getExplainPlanNumMatchAllFilterSegments();
+
+ void setExplainPlanNumMatchAllFilterSegments(long
explainPlanNumMatchAllFilterSegments);
+
+ Map<String, String> getTraceInfo();
+
+ void setTraceInfo(Map<String, String> traceInfo);
+
+ List<String> getProcessingExceptions();
+
+ void setProcessingExceptions(List<String> processingExceptions);
+
enum FanoutType {
OFFLINE, REALTIME, HYBRID
}
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]